[feature](Load)(step2)support nereids load job schedule (#26356)

We will  Integrate new load job manager into  new job scheduling framework so that the insert into task can be scheduled after the broker load sql  is converted to insert into TVF(table value function) sql.

issue: https://github.com/apache/doris/issues/24221

Now support:
1. load data by tvf insert into sql, but just for simple load(columns need to be defined in the table)
2. show load stmt
- job id, label name, job state, time info
- simple progress
3. cancel load from db
4. support that enable new load through Config.enable_nereids_load
5. can replay job after restarting doris

TODO:
- support partition insert job
- support show statistics from BE
- support multiple task and collect task statistic
- support transactional task
- need add ut case
This commit is contained in:
slothever
2023-12-26 12:29:05 +08:00
committed by GitHub
parent b8fd55b0cf
commit 509cfea99a
33 changed files with 1329 additions and 379 deletions

View File

@ -117,10 +117,8 @@ public class CreateJobStmt extends DdlStmt {
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();
if (null != onceJobStartTimestamp) {
@ -148,17 +146,19 @@ public class CreateJobStmt extends DdlStmt {
}
checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);
job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.setJobId(Env.getCurrentEnv().getNextId());
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);
// create job use label name as its job name
String jobName = labelName.getLabelName();
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
labelName.getDbName(),
comment,
ConnectContext.get().getCurrentUserIdentity(),
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
//job.checkJobParams();
jobInstance = job;
}

View File

@ -108,6 +108,13 @@ public class ShowLoadStmt extends ShowStmt {
return states;
}
public org.apache.doris.load.loadv2.JobState getStateV2() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
return org.apache.doris.load.loadv2.JobState.valueOf(stateValue);
}
public boolean isAccurateMatch() {
return isAccurateMatch;
}

View File

@ -182,6 +182,7 @@ import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@ -362,6 +363,7 @@ public class Env {
private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
@ -641,8 +643,11 @@ public class Env {
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.transientTaskManager = new TransientTaskManager();
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
@ -3907,6 +3912,10 @@ public class Env {
return jobManager;
}
public LabelProcessor getLabelProcessor() {
return labelProcessor;
}
public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}

View File

@ -23,6 +23,8 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
@ -76,13 +78,54 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
private JobExecutionConfiguration jobConfig;
@SerializedName(value = "ctms")
private Long createTimeMs;
private long createTimeMs;
@SerializedName(value = "stm")
private long startTimeMs = -1L;
@SerializedName(value = "ftm")
private long finishTimeMs;
@SerializedName(value = "sql")
String executeSql;
@SerializedName(value = "ftm")
private long finishTimeMs;
public AbstractJob() {}
public AbstractJob(Long id) {
jobId = id;
}
/**
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
this.currentDbName = currentDbName;
this.comment = comment;
this.createUser = createUser;
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}
private List<T> runningTasks = new ArrayList<>();
@ -109,6 +152,10 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
.add("Comment")
.build();
protected static long getNextJobId() {
return System.nanoTime() + RandomUtils.nextInt();
}
@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
@ -154,17 +201,18 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
return createTasks(taskType, taskContext);
}
public void initTasks(List<? extends AbstractTask> tasks) {
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(getNextId());
task.setTaskType(taskType);
task.setJobId(getJobId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
if (CollectionUtils.isEmpty(getRunningTasks())) {
setRunningTasks(new ArrayList<>());
}
getRunningTasks().addAll((Collection<? extends T>) tasks);
getRunningTasks().addAll(tasks);
this.startTimeMs = System.currentTimeMillis();
}
public void checkJobParams() {
@ -208,10 +256,22 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.setRunningTasks(new ArrayList<>());
job.runningTasks = new ArrayList<>();
return job;
}
public void logCreateOperation() {
Env.getCurrentEnv().getEditLog().logCreateJob(this);
}
public void logFinalOperation() {
Env.getCurrentEnv().getEditLog().logEndJob(this);
}
public void logUpdateOperation() {
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
@ -303,7 +363,19 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
return builder.build();
}
private static long getNextId() {
return System.nanoTime() + RandomUtils.nextInt();
@Override
public void onRegister() throws JobException {}
@Override
public void onUnRegister() throws JobException {}
@Override
public void onReplayCreate() throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay create scheduler job").build());
}
@Override
public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build());
}
}

View File

@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
/**
* Cancels all running tasks of this job.
*
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
/**
* register job
* @throws JobException If register job failed.
*/
void onRegister() throws JobException;
/**
* register job failed
* @throws JobException If failed.
*/
void onUnRegister() throws JobException;
/**
* replay create job
* @throws JobException If replay create failed.
*/
void onReplayCreate() throws JobException;
/**
* replay finished or cancelled job
* @throws JobException If replay end failed.
*/
void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException;
/**
* Notifies the job when a task execution fails.
*

View File

@ -37,7 +37,7 @@ public enum JobExecuteType {
*/
MANUAL,
/**
* The job will be executed immediately.
* The job will be executed only once and immediately.
*/
INSTANT,
}

View File

@ -36,7 +36,6 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,
/**
* When the task is finished, the finished state will be triggered.
*/

View File

@ -19,5 +19,5 @@ package org.apache.doris.job.common;
public enum JobType {
INSERT,
MV
MV,
}

View File

@ -17,43 +17,74 @@
package org.apache.doris.job.extensions.insert;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class InsertJob extends AbstractJob<InsertTask, Map> {
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@ -66,40 +97,145 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
private static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("TaskId", ScalarType.createVarchar(80)))
.addColumn(new Column("Label", ScalarType.createVarchar(80)))
.addColumn(new Column("Status", ScalarType.createVarchar(20)))
.addColumn(new Column("EtlInfo", ScalarType.createVarchar(100)))
.addColumn(new Column("TaskInfo", ScalarType.createVarchar(100)))
.addColumn(new Column("ErrorMsg", ScalarType.createVarchar(100)))
.addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200)))
.addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200)))
.addColumn(new Column("User", ScalarType.createVarchar(50)))
.build();
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
@SerializedName(value = "lp")
String labelPrefix;
InsertIntoTableCommand command;
StmtExecutor stmtExecutor;
ConnectContext ctx;
@SerializedName("tis")
ConcurrentLinkedQueue<Long> historyTaskIdList;
@SerializedName("did")
private final long dbId;
@SerializedName("ln")
private String labelName;
@SerializedName("lt")
private InsertJob.LoadType loadType;
// 0: the job status is pending
// n/100: n is the number of task which has been finished
// 99: all tasks have been finished
// 100: txn status is visible and load has been finished
@SerializedName("pg")
private int progress;
@SerializedName("fm")
private FailMsg failMsg;
@SerializedName("plans")
private List<InsertIntoTableCommand> plans = new ArrayList<>();
private LoadStatistic loadStatistic = new LoadStatistic();
private Set<Long> finishedTaskIds = new HashSet<>();
private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
private Map<String, String> properties = new HashMap<>();
private Set<String> tableNames;
private AuthorizationInfo authorizationInfo;
private ConnectContext ctx;
private StmtExecutor stmtExecutor;
private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
private List<TabletCommitInfo> commitInfos = new ArrayList<>();
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;
/**
* load job type
*/
public enum LoadType {
BULK,
SPARK,
LOCAL_FILE,
UNKNOWN
}
public enum Priority {
HIGH(0),
NORMAL(1),
LOW(2);
Priority(int value) {
this.value = value;
}
private final int value;
public int getValue() {
return value;
}
}
public InsertJob(String jobName,
JobStatus jobStatus,
String dbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql, null);
this.dbId = ConnectContext.get().getCurrentDbId();
}
public InsertJob(ConnectContext ctx,
StmtExecutor executor,
String labelName,
List<InsertIntoTableCommand> plans,
Set<String> sinkTableNames,
Map<String, String> properties,
String comment,
JobExecutionConfiguration jobConfig) {
super(getNextJobId(), labelName, JobStatus.RUNNING, null,
comment, ctx.getCurrentUserIdentity(), jobConfig);
this.ctx = ctx;
this.plans = plans;
this.stmtExecutor = executor;
this.dbId = ctx.getCurrentDbId();
this.labelName = labelName;
this.tableNames = sinkTableNames;
this.properties = properties;
// TODO: not support other type yet
this.loadType = InsertJob.LoadType.BULK;
}
@Override
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
//nothing need to do in insert job
InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
task.setJobId(getJobId());
task.setTaskType(taskType);
task.setTaskId(Env.getCurrentEnv().getNextId());
ArrayList<InsertTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
recordTask(task.getTaskId());
return tasks;
public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
if (plans.isEmpty()) {
InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
idToTasks.put(task.getTaskId(), task);
recordTask(task.getTaskId());
} else {
// use for load stmt
for (InsertIntoTableCommand logicalPlan : plans) {
if (!logicalPlan.getLabelName().isPresent()) {
throw new IllegalArgumentException("Load plan need label name.");
}
InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic);
idToTasks.put(task.getTaskId(), task);
recordTask(task.getTaskId());
}
}
initTasks(idToTasks.values(), taskType);
return new ArrayList<>(idToTasks.values());
}
public void recordTask(long id) {
@ -116,7 +252,6 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
@ -125,23 +260,27 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
}
@Override
public boolean isReadyForScheduling(Map taskContext) {
public void cancelAllTasks() throws JobException {
try {
checkAuth("CANCEL LOAD");
super.cancelAllTasks();
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel");
} catch (DdlException e) {
throw new JobException(e);
}
}
@Override
public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
}
@Override
public void cancelAllTasks() throws JobException {
super.cancelAllTasks();
}
@Override
protected void checkJobParamsInternal() {
if (command == null && StringUtils.isBlank(getExecuteSql())) {
if (plans.isEmpty() && StringUtils.isBlank(getExecuteSql())) {
throw new IllegalArgumentException("command or sql is null,must be set");
}
if (null != command && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
if (!plans.isEmpty() && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
throw new IllegalArgumentException("command must be null when executeType is not instant");
}
}
@ -153,27 +292,22 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
}
//TODO it's will be refactor, we will storage task info in job inner and query from it
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
Collections.reverse(taskIdList);
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
if (CollectionUtils.isEmpty(loadJobs)) {
return queryLoadTasksByTaskIds(taskIdList);
}
public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
if (taskIdList.isEmpty()) {
return new ArrayList<>();
}
List<InsertTask> tasks = new ArrayList<>();
loadJobs.forEach(loadJob -> {
InsertTask task;
try {
task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser());
task.setCreateTimeMs(loadJob.getCreateTimestamp());
} catch (MetaNotFoundException e) {
log.warn("load job not found, job id is {}", loadJob.getId());
return;
List<InsertTask> jobs = new ArrayList<>();
taskIdList.forEach(id -> {
if (null != idToTasks.get(id)) {
jobs.add(idToTasks.get(id));
}
task.setJobId(getJobId());
task.setTaskId(loadJob.getId());
task.setLoadJob(loadJob);
tasks.add(task);
});
return tasks;
return jobs;
}
@Override
@ -193,6 +327,12 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
@Override
public void onTaskFail(InsertTask task) {
try {
updateJobStatus(JobStatus.STOPPED);
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg());
} catch (JobException e) {
throw new RuntimeException(e);
}
getRunningTasks().remove(task);
}
@ -203,7 +343,129 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
@Override
public List<String> getShowInfo() {
return super.getCommonShowInfo();
try {
// check auth
checkAuth("SHOW LOAD");
List<String> jobInfo = Lists.newArrayList();
// jobId
jobInfo.add(getJobId().toString());
// label
if (StringUtils.isEmpty(getLabelName())) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(getLabelName());
}
// state
if (getJobStatus() == JobStatus.STOPPED) {
jobInfo.add("CANCELLED");
} else {
jobInfo.add(getJobStatus().name());
}
// progress
String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId()));
switch (getJobStatus()) {
case RUNNING:
if (isPending()) {
jobInfo.add("ETL:0%; LOAD:0%");
} else {
jobInfo.add("ETL:100%; LOAD:" + progress + "%");
}
break;
case FINISHED:
jobInfo.add("ETL:100%; LOAD:100%");
break;
case STOPPED:
default:
jobInfo.add("ETL:N/A; LOAD:N/A");
break;
}
// type
jobInfo.add(loadType.name());
// etl info
if (loadStatistic.getCounters().size() == 0) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadStatistic.getCounters()));
}
// task info
jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout()
+ "; max_filter_ratio:" + getMaxFilterRatio() + "; priority:" + getPriority());
// error msg
if (failMsg == null) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
}
// create time
jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs()));
// etl start time
jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
// etl end time
jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
// load start time
jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
// load end time
jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
// tracking urls
List<String> trackingUrl = idToTasks.values().stream()
.map(task -> {
if (StringUtils.isNotEmpty(task.getTrackingUrl())) {
return task.getTrackingUrl();
} else {
return FeConstants.null_string;
}
})
.collect(Collectors.toList());
if (trackingUrl.isEmpty()) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(trackingUrl.toString());
}
// job details
jobInfo.add(loadStatistic.toJson());
// transaction id
jobInfo.add(String.valueOf(0));
// error tablets
jobInfo.add(errorTabletsToJson());
// user, some load job may not have user info
if (getCreateUser() == null || getCreateUser().getQualifiedUser() == null) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(getCreateUser().getQualifiedUser());
}
// comment
jobInfo.add(getComment());
return jobInfo;
} catch (DdlException e) {
throw new RuntimeException(e);
}
}
private String getPriority() {
return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name());
}
public double getMaxFilterRatio() {
return Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.0"));
}
public long getTimeout() {
if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
}
return Config.broker_load_default_timeout_second;
}
public static InsertJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
job.setRunningTasks(new ArrayList<>());
return job;
}
@Override
@ -211,19 +473,129 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
private static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
.addColumn(new Column("Label", ScalarType.createVarchar(20)))
.addColumn(new Column("Status", ScalarType.createVarchar(20)))
.addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
.addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
.addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
public String errorTabletsToJson() {
Map<Long, String> map = new HashMap<>();
errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load)
.forEach(p -> map.put(p.getTabletId(), p.getMsg()));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(map);
}
.addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
.addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
.addColumn(new Column("User", ScalarType.createVarchar(20)))
.build();
public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
long scannedBytes, boolean isDone) {
loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100);
if (progress == 100) {
progress = 99;
}
}
private void checkAuth(String command) throws DdlException {
if (authorizationInfo == null) {
// use the old method to check priv
checkAuthWithoutAuthInfo(command);
return;
}
if (!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(), authorizationInfo,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
Privilege.LOAD_PRIV);
}
}
/**
* This method is compatible with old load job without authorization info
* If db or table name could not be found by id, it will throw the NOT_EXISTS_ERROR
*
* @throws DdlException
*/
private void checkAuthWithoutAuthInfo(String command) throws DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
// check auth
if (tableNames == null || tableNames.isEmpty()) {
// forward compatibility
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), db.getFullName(),
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
Privilege.LOAD_PRIV);
}
} else {
for (String tblName : tableNames) {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), db.getFullName(),
tblName, PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
command,
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), db.getFullName() + ": " + tblName);
}
}
}
}
public void unprotectReadEndOperation(InsertJob replayLog) {
setJobStatus(replayLog.getJobStatus());
progress = replayLog.getProgress();
setStartTimeMs(replayLog.getStartTimeMs());
setFinishTimeMs(replayLog.getFinishTimeMs());
failMsg = replayLog.getFailMsg();
}
public String getResourceName() {
// TODO: get tvf param from tvf relation
return "N/A";
}
public boolean isRunning() {
return getJobStatus() != JobStatus.FINISHED;
}
public boolean isPending() {
return getJobStatus() != JobStatus.FINISHED;
}
public boolean isCancelled() {
return getJobStatus() == JobStatus.STOPPED;
}
@Override
public void onRegister() throws JobException {
try {
if (StringUtils.isNotEmpty(labelName)) {
Env.getCurrentEnv().getLabelProcessor().addJob(this);
}
} catch (LabelAlreadyUsedException e) {
throw new JobException(e);
}
}
@Override
public void onUnRegister() throws JobException {
// TODO: need record cancelled jobs in order to show cancelled job
// Env.getCurrentEnv().getLabelProcessor().removeJob(getDbId(), getLabelName());
}
@Override
public void onReplayCreate() throws JobException {
JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
jobConfig.setExecuteType(JobExecuteType.INSTANT);
setJobConfig(jobConfig);
onRegister();
checkJobParams();
log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build());
}
@Override
public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob) throws JobException {
if (!(replayJob instanceof InsertJob)) {
return;
}
InsertJob insertJob = (InsertJob) replayJob;
unprotectReadEndOperation(insertJob);
log.info(new LogBuilder(LogKey.LOAD_JOB,
insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build());
}
public int getProgress() {
return progress;
}
}

View File

@ -25,7 +25,8 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
@ -34,12 +35,12 @@ import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
import java.util.UUID;
@ -53,8 +54,6 @@ public class InsertTask extends AbstractTask {
new Column("JobId", ScalarType.createStringType()),
new Column("Label", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("EtlInfo", ScalarType.createStringType()),
new Column("TaskInfo", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTimeMs", ScalarType.createStringType()),
new Column("FinishTimeMs", ScalarType.createStringType()),
@ -73,30 +72,69 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
private InsertIntoTableCommand command;
private StmtExecutor stmtExecutor;
private ConnectContext ctx;
private String sql;
private String currentDb;
private UserIdentity userIdentity;
private LoadStatistic loadStatistic;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private AtomicBoolean isFinished = new AtomicBoolean(false);
private static final String LABEL_SPLITTER = "_";
private FailMsg failMsg;
@Getter
private String trackingUrl;
@Getter
@Setter
private LoadJob loadJob;
private InsertJob jobInfo;
private TaskType taskType = TaskType.PENDING;
private MergeType mergeType = MergeType.APPEND;
/**
* task merge type
*/
enum MergeType {
MERGE,
APPEND,
DELETE
}
/**
* task type
*/
enum TaskType {
UNKNOWN, // this is only for ISSUE #2354
PENDING,
LOADING,
FINISHED,
FAILED,
CANCELLED
}
public InsertTask(InsertIntoTableCommand insertInto,
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
this(insertInto.getLabelName().get(), insertInto, ctx, executor, statistic);
}
public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
this.labelName = labelName;
this.sql = sql;
this.currentDb = currentDb;
this.userIdentity = userIdentity;
}
public InsertTask(String labelName, InsertIntoTableCommand insertInto,
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
this.labelName = labelName;
this.command = insertInto;
this.userIdentity = ctx.getCurrentUserIdentity();
this.ctx = ctx;
this.stmtExecutor = executor;
this.loadStatistic = statistic;
}
@Override
public void before() throws JobException {
@ -109,15 +147,19 @@ public class InsertTask extends AbstractTask {
ctx.setCurrentUserIdentity(userIdentity);
ctx.getState().reset();
ctx.setThreadLocalInfo();
ctx.setDatabase(currentDb);
if (StringUtils.isNotEmpty(currentDb)) {
ctx.setDatabase(currentDb);
}
TUniqueId queryId = generateQueryId(UUID.randomUUID().toString());
ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
stmtExecutor = new StmtExecutor(ctx, (String) null);
ctx.setQueryId(queryId);
NereidsParser parser = new NereidsParser();
this.command = (InsertIntoTableCommand) parser.parseSingle(sql);
this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.command.setJobId(getTaskId());
if (StringUtils.isNotEmpty(sql)) {
NereidsParser parser = new NereidsParser();
this.command = (InsertIntoTableCommand) parser.parseSingle(sql);
this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
this.command.setJobId(getTaskId());
}
super.before();
@ -128,14 +170,6 @@ public class InsertTask extends AbstractTask {
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
}
public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
this.labelName = labelName;
this.sql = sql;
this.currentDb = currentDb;
this.userIdentity = userIdentity;
}
@Override
public void run() throws JobException {
try {
@ -143,7 +177,7 @@ public class InsertTask extends AbstractTask {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
command.run(ctx, stmtExecutor);
command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
} catch (Exception e) {
log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(),
getTaskId(), sql, e);
@ -178,42 +212,28 @@ public class InsertTask extends AbstractTask {
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
if (loadJob == null) {
if (jobInfo == null) {
// if task not start, load job is null,return pending task show info
return getPendingTaskTVFInfo();
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name()));
// etl info
String etlInfo = FeConstants.null_string;
if (!loadJob.getLoadingStatus().getCounters().isEmpty()) {
etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters());
}
trow.addToColumnValue(new TCell().setStringVal(etlInfo));
// task info
String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority();
trow.addToColumnValue(new TCell().setStringVal(taskInfo));
trow.addToColumnValue(new TCell().setStringVal(labelName));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name()));
// err msg
String errMsg = FeConstants.null_string;
if (loadJob.getFailMsg() != null) {
errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg();
if (failMsg != null) {
errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
@ -225,8 +245,6 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));

View File

@ -125,7 +125,7 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
super.initTasks(tasks, taskType);
return tasks;
}

View File

@ -17,7 +17,15 @@
package org.apache.doris.job.manager;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@ -26,44 +34,85 @@ import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.loadv2.JobState;
import com.google.common.collect.Lists;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Log4j2
public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
private JobScheduler jobScheduler;
private JobScheduler<T, C> jobScheduler;
// lock for job
// lock is private and must use after db lock
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public void start() {
jobScheduler = new JobScheduler(jobMap);
jobScheduler = new JobScheduler<T, C>(jobMap);
jobScheduler.start();
}
public void registerJob(T job) throws JobException {
job.checkJobParams();
checkJobNameExist(job.getJobName());
if (jobMap.get(job.getJobId()) != null) {
throw new JobException("job id exist, jobId:" + job.getJobId());
}
Env.getCurrentEnv().getEditLog().logCreateJob(job);
jobMap.put(job.getJobId(), job);
//check its need to scheduler
jobScheduler.scheduleOneJob(job);
/**
* get running job
*
* @param jobId id
* @return running job
*/
public T getJob(long jobId) {
return jobMap.get(jobId);
}
public void registerJob(T job) throws JobException {
writeLock();
try {
job.onRegister();
job.checkJobParams();
checkJobNameExist(job.getJobName());
if (jobMap.get(job.getJobId()) != null) {
throw new JobException("job id exist, jobId:" + job.getJobId());
}
jobMap.put(job.getJobId(), job);
//check its need to scheduler
jobScheduler.scheduleOneJob(job);
job.logCreateOperation();
} finally {
writeUnlock();
}
}
private void checkJobNameExist(String jobName) throws JobException {
if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) {
@ -72,11 +121,17 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
}
public void unregisterJob(Long jobId) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
jobMap.get(jobId).cancelAllTasks();
Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId));
jobMap.remove(jobId);
writeLock();
try {
checkJobExist(jobId);
jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
jobMap.get(jobId).cancelAllTasks();
jobMap.get(jobId).logFinalOperation();
jobMap.get(jobId).onUnRegister();
jobMap.remove(jobId);
} finally {
writeUnlock();
}
}
public void unregisterJob(String jobName) throws JobException {
@ -95,7 +150,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
jobMap.get(jobId).logUpdateOperation();
}
public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException {
@ -169,13 +224,12 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context);
}
public void replayCreateJob(T job) {
public void replayCreateJob(T job) throws JobException {
if (jobMap.containsKey(job.getJobId())) {
return;
}
jobMap.putIfAbsent(job.getJobId(), job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay create scheduler job").build());
job.onReplayCreate();
}
/**
@ -187,13 +241,12 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
.add("msg", "replay update scheduler job").build());
}
public void replayDeleteJob(T job) {
if (null == jobMap.get(job.getJobId())) {
public void replayEndJob(T replayJob) throws JobException {
T job = jobMap.get(replayJob.getJobId());
if (null == job) {
return;
}
jobMap.remove(job.getJobId());
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay delete scheduler job").build());
job.onReplayEnd(replayJob);
}
public void cancelTaskById(String jobName, Long taskId) throws JobException {
@ -236,4 +289,135 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
return jobMap.get(jobId);
}
/**
* get load info by db
*
* @param dbId db id
* @param dbName db name
* @param labelValue label name
* @param accurateMatch accurate match
* @param jobState state
* @return load infos
* @throws AnalysisException ex
*/
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
String labelValue,
boolean accurateMatch,
JobState jobState) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
if (!Env.getCurrentEnv().getLabelProcessor().existJobs(dbId)) {
return loadJobInfos;
}
readLock();
try {
List<InsertJob> loadJobList = Env.getCurrentEnv().getLabelProcessor()
.filterJobs(dbId, labelValue, accurateMatch);
// check state
for (InsertJob loadJob : loadJobList) {
try {
if (jobState != null && !validState(jobState, loadJob)) {
continue;
}
// add load job info, convert String list to Comparable list
loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
} catch (RuntimeException e) {
// ignore this load job
log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e);
}
}
return loadJobInfos;
} finally {
readUnlock();
}
}
private static boolean validState(JobState jobState, InsertJob loadJob) {
JobStatus status = loadJob.getJobStatus();
switch (status) {
case RUNNING:
return jobState == JobState.PENDING || jobState == JobState.ETL
|| jobState == JobState.LOADING || jobState == JobState.COMMITTED;
case STOPPED:
return jobState == JobState.CANCELLED;
case FINISHED:
return jobState == JobState.FINISHED;
default:
return false;
}
}
public void cancelLoadJob(CancelLoadStmt cs)
throws JobException, AnalysisException, DdlException {
String dbName = cs.getDbName();
String label = cs.getLabel();
String state = cs.getState();
CompoundPredicate.Operator operator = cs.getOperator();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<InsertJob> unfinishedLoadJob;
readLock();
try {
List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db);
List<InsertJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new JobException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(InsertJob::isRunning)
.collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new JobException("There is no uncompleted job");
}
} finally {
readUnlock();
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
unregisterJob(loadJob.getJobId());
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}
}
}
private static void addNeedCancelLoadJob(String label, String state,
CompoundPredicate.Operator operator, List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> !job.isCancelled())
.filter(job -> {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state);
return CompoundPredicate.Operator.AND.equals(operator) ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getJobStatus().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}
// public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
// long scannedBytes, boolean isDone) {
// AbstractJob job = jobMap.get(jobId);
// if (job != null) {
// job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
// }
// }
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.RandomUtils;
@Data
@Log4j2
@ -52,6 +53,15 @@ public abstract class AbstractTask implements Task {
@SerializedName(value = "emg")
private String errMsg;
public AbstractTask() {
taskId = getNextTaskId();
}
private static long getNextTaskId() {
// do not use Env.getNextId(), just generate id without logging
return System.nanoTime() + RandomUtils.nextInt();
}
@Override
public void onFail(String msg) throws JobException {
status = TaskStatus.FAILED;

View File

@ -73,6 +73,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
@ -198,7 +199,7 @@ public class ExportJob implements Writable {
private List<ExportTaskExecutor> jobExecutorList;
private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>();
private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
@ -380,6 +381,10 @@ public class ExportJob implements Writable {
return statementBase;
}
public List<? extends TransientTaskExecutor> getTaskExecutors() {
return jobExecutorList;
}
private void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
for (List<StatementBase> selectStmts : selectStmtListPerParallel) {
@ -607,7 +612,7 @@ public class ExportJob implements Writable {
// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
try {
Env.getCurrentEnv().getExportTaskRegister().cancelTask(id);
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
}

View File

@ -59,23 +59,26 @@ import java.util.stream.Collectors;
public class ExportMgr {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
// lock for export job
// lock is private and must use after db lock
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap();
// lock for export job
// lock is private and must use after db lock
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public ExportMgr() {
}
public void readLock() {
public List<ExportJob> getJobs() {
return Lists.newArrayList(exportIdToJob.values());
}
private void readLock() {
lock.readLock().lock();
}
public void readUnlock() {
private void readUnlock() {
lock.readLock().unlock();
}
@ -87,10 +90,6 @@ public class ExportMgr {
lock.writeLock().unlock();
}
public List<ExportJob> getJobs() {
return Lists.newArrayList(exportIdToJob.values());
}
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
long jobId = Env.getCurrentEnv().getNextId();
job.setId(jobId);
@ -101,9 +100,8 @@ public class ExportMgr {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
job.getJobExecutorList().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getExportTaskRegister().registerTask(executor);
executor.setTaskId(taskId);
job.getTaskExecutors().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);

View File

@ -66,12 +66,18 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
private AtomicBoolean isFinished;
ExportTaskExecutor(List<StatementBase> selectStmtLists, ExportJob exportJob) {
this.taskId = UUID.randomUUID().getMostSignificantBits();
this.selectStmtLists = selectStmtLists;
this.exportJob = exportJob;
this.isCanceled = new AtomicBoolean(false);
this.isFinished = new AtomicBoolean(false);
}
@Override
public Long getId() {
return taskId;
}
@Override
public void execute() throws JobException {
if (isCanceled.get()) {

View File

@ -37,7 +37,6 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
@ -61,11 +60,9 @@ import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
@ -138,108 +135,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected String comment = "";
public static class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks(queries), and each task has several fragments.
// each fragment will report independently.
// load task id -> fragment id -> rows count
private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
// load task id -> fragment id -> load bytes
private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create();
// load task id -> unfinished backend id list
private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
// load task id -> all backend id list
private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
// number of file to be loaded
public int fileNum = 0;
public long totalFileSizeB = 0;
// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
counterTbl.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
counterTbl.put(loadId, fragId, 0L);
}
loadBytes.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
loadBytes.put(loadId, fragId, 0L);
}
allBackendIds.put(loadId, relatedBackendIds);
// need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
// allBackendIds, the list in unfinishedBackendIds will not be changed.
unfinishedBackendIds.put(loadId, Lists.newArrayList(relatedBackendIds));
}
public synchronized void removeLoad(TUniqueId loadId) {
counterTbl.rowMap().remove(loadId);
loadBytes.rowMap().remove(loadId);
unfinishedBackendIds.remove(loadId);
allBackendIds.remove(loadId);
}
public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
long rows, long bytes, boolean isDone) {
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
if (loadBytes.contains(loadId, fragmentId)) {
loadBytes.put(loadId, fragmentId, bytes);
}
if (isDone && unfinishedBackendIds.containsKey(loadId)) {
unfinishedBackendIds.get(loadId).remove(backendId);
}
}
public synchronized long getScannedRows() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
return total;
}
public synchronized long getLoadBytes() {
long total = 0;
for (long bytes : loadBytes.values()) {
total += bytes;
}
return total;
}
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
long totalBytes = 0;
for (long bytes : loadBytes.values()) {
totalBytes += bytes;
}
Map<String, Object> details = Maps.newHashMap();
details.put("ScannedRows", total);
details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
details.put("All backends", getPrintableMap(allBackendIds));
Gson gson = new Gson();
return gson.toJson(details);
}
private Map<String, List<Long>> getPrintableMap(Map<TUniqueId, List<Long>> map) {
Map<String, List<Long>> newMap = Maps.newHashMap();
for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
newMap.put(DebugUtil.printId(entry.getKey()), entry.getValue());
}
return newMap;
}
}
public LoadJob(EtlJobType jobType) {
this.jobType = jobType;

View File

@ -278,7 +278,7 @@ public class LoadManager implements Writable {
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
List<LoadJob> uncompletedLoadJob = Lists.newArrayList();
List<LoadJob> unfinishedLoadJob;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
@ -293,15 +293,15 @@ public class LoadManager implements Writable {
throw new DdlException("Load job does not exist");
}
// check state here
uncompletedLoadJob =
unfinishedLoadJob =
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
if (unfinishedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job");
}
} finally {
readUnlock();
}
for (LoadJob loadJob : uncompletedLoadJob) {
for (LoadJob loadJob : unfinishedLoadJob) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks(queries), and each task has several fragments.
// each fragment will report independently.
// load task id -> fragment id -> rows count
private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
// load task id -> fragment id -> load bytes
private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create();
// load task id -> unfinished backend id list
private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
// load task id -> all backend id list
private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
private Map<String, String> counters = new HashMap<>();
// number of file to be loaded
public int fileNum = 0;
public long totalFileSizeB = 0;
// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
counterTbl.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
counterTbl.put(loadId, fragId, 0L);
}
loadBytes.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
loadBytes.put(loadId, fragId, 0L);
}
allBackendIds.put(loadId, relatedBackendIds);
// need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
// allBackendIds, the list in unfinishedBackendIds will not be changed.
unfinishedBackendIds.put(loadId, Lists.newArrayList(relatedBackendIds));
}
public synchronized void removeLoad(TUniqueId loadId) {
counterTbl.rowMap().remove(loadId);
loadBytes.rowMap().remove(loadId);
unfinishedBackendIds.remove(loadId);
allBackendIds.remove(loadId);
}
public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
long rows, long bytes, boolean isDone) {
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
if (loadBytes.contains(loadId, fragmentId)) {
loadBytes.put(loadId, fragmentId, bytes);
}
if (isDone && unfinishedBackendIds.containsKey(loadId)) {
unfinishedBackendIds.get(loadId).remove(backendId);
}
}
public synchronized long getScannedRows() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
return total;
}
public synchronized long getLoadBytes() {
long total = 0;
for (long bytes : loadBytes.values()) {
total += bytes;
}
return total;
}
public Map<String, String> getCounters() {
// TODO: add extra statistics to counters
return counters;
}
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
long totalBytes = 0;
for (long bytes : loadBytes.values()) {
totalBytes += bytes;
}
Map<String, Object> details = Maps.newHashMap();
details.put("ScannedRows", total);
details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
details.put("All backends", getPrintableMap(allBackendIds));
Gson gson = new Gson();
return gson.toJson(details);
}
private Map<String, List<Long>> getPrintableMap(Map<TUniqueId, List<Long>> map) {
Map<String, List<Long>> newMap = Maps.newHashMap();
for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
newMap.put(DebugUtil.printId(entry.getKey()), entry.getValue());
}
return newMap;
}
}

View File

@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.jobs.load;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* label manager
*/
public class LabelProcessor {
private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
/**
* get jobs with label
* @param db db
* @return jobs
* @throws JobException e
*/
public List<InsertJob> getJobs(Database db) throws JobException {
readLock();
try {
Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new JobException("Load job does not exist");
}
return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
} finally {
readUnlock();
}
}
/**
* add job with label
*
* @param job job with label
* @throws LabelAlreadyUsedException e
*/
public void addJob(InsertJob job) throws LabelAlreadyUsedException {
writeLock();
try {
Map<String, List<InsertJob>> labelToLoadJobs;
if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
labelToLoadJobs = new ConcurrentHashMap<>();
dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
}
labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
if (labelToLoadJobs.containsKey(job.getLabelName())) {
throw new LabelAlreadyUsedException(job.getLabelName());
} else {
labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
}
labelToLoadJobs.get(job.getLabelName()).add(job);
} finally {
writeUnlock();
}
}
/**
* support remove label job
* @param dbId db id
* @param labelName label name
*/
public void removeJob(long dbId, String labelName) {
writeLock();
try {
if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
dbIdToLabelToLoadJobs.get(dbId).remove(labelName);
}
} finally {
writeUnlock();
}
}
public void cleanOldLabels() throws JobException {
// TODO: remain this method to implement label cleaner
}
/**
* filterJobs with label and support quick match label
* @param dbId dbId
* @param labelValue label
* @param accurateMatch direct find label from map
* @return jobs with label
*/
public List<InsertJob> filterJobs(long dbId, String labelValue, boolean accurateMatch)
throws AnalysisException {
List<InsertJob> loadJobList = new ArrayList<>();
readLock();
try {
Map<String, List<InsertJob>> labelToLoadJobs = this.dbIdToLabelToLoadJobs.get(dbId);
if (Strings.isNullOrEmpty(labelValue)) {
loadJobList.addAll(
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
} else {
// check label value
if (accurateMatch) {
if (!labelToLoadJobs.containsKey(labelValue)) {
return new ArrayList<>();
}
loadJobList.addAll(labelToLoadJobs.get(labelValue));
} else {
// non-accurate match
PatternMatcher matcher =
PatternMatcherWrapper.createMysqlPattern(labelValue,
CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<InsertJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobList.addAll(entry.getValue());
}
}
}
}
} finally {
readUnlock();
}
return loadJobList;
}
/**
* check jobs in database
* @param dbId dbId
* @return has jobs
*/
public boolean existJobs(long dbId) {
readLock();
try {
return dbIdToLabelToLoadJobs.containsKey(dbId);
} finally {
readUnlock();
}
}
}

View File

@ -945,7 +945,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
LoadTask.MergeType mergeType = ddc.mergeType() == null ? LoadTask.MergeType.APPEND
: LoadTask.MergeType.valueOf(ddc.mergeType().getText());
Optional<String> fileFormat = ddc.format == null ? Optional.empty() : Optional.of(ddc.format.getText());
Optional<String> fileFormat = ddc.format == null ? Optional.empty()
: Optional.of(visitIdentifierOrStringLiteral(ddc.format));
Optional<String> separator = ddc.separator == null ? Optional.empty() : Optional.of(ddc.separator.getText()
.substring(1, ddc.separator.getText().length() - 1));
Optional<String> comma = ddc.comma == null ? Optional.empty() : Optional.of(ddc.comma.getText()
@ -970,7 +971,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
String labelName = ctx.lableName.getText();
Map<String, String> properties = visitPropertyItemList(ctx.properties);
String commentSpec = ctx.commentSpec() == null ? "" : ctx.commentSpec().STRING_LITERAL().getText();
String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText();
String comment =
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1));
return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment);
@ -1044,6 +1045,15 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
}
@Override
public String visitIdentifierOrStringLiteral(DorisParser.IdentifierOrStringLiteralContext ctx) {
if (ctx.STRING_LITERAL() != null) {
return ctx.STRING_LITERAL().getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1);
} else {
return ctx.identifier().getText();
}
}
@Override
public UserIdentity visitUserIdentify(UserIdentifyContext ctx) {
String user = visitIdentifierOrText(ctx.user);

View File

@ -318,11 +318,14 @@ public class InsertExecutor {
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createAt, throwable == null ? "" : throwable.getMessage(),
coordinator.getTrackingUrl(), userIdentity, jobId);
if (!Config.enable_nereids_load) {
// just record for loadv2 here
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createAt, throwable == null ? "" : throwable.getMessage(),
coordinator.getTrackingUrl(), userIdentity, jobId);
}
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -95,6 +96,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
this.allowAutoPartition = true;
}
public Optional<String> getLabelName() {
return labelName;
}
public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}
@ -109,6 +114,16 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
runInternal(ctx, executor);
}
public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
LoadStatistic loadStatistic) throws Exception {
// TODO: add coordinator statistic
runInternal(ctx, executor);
}
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();

View File

@ -18,16 +18,20 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
@ -53,7 +57,6 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryStateException;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.tablefunction.HdfsTableValuedFunction;
import org.apache.doris.tablefunction.S3TableValuedFunction;
@ -83,10 +86,11 @@ public class LoadCommand extends Command implements ForwardWithSync {
private final String labelName;
private final BulkStorageDesc bulkStorageDesc;
private final Set<String> sinkTableNames = new HashSet<>();
private final List<BulkLoadDataDesc> sourceInfos;
private final Map<String, String> properties;
private final String comment;
private final List<LogicalPlan> plans = new ArrayList<>();
private List<InsertIntoTableCommand> plans = new ArrayList<>();
private Profile profile;
/**
@ -119,15 +123,19 @@ public class LoadCommand extends Command implements ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// TODO: begin txn form multi insert sql
/* this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile);
profile.getSummaryProfile().setQueryBeginTime();
for (BulkLoadDataDesc dataDesc : sourceInfos) {
plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, dataDesc), Optional.of(labelName), false));
}
profile.getSummaryProfile().setQueryPlanFinishTime();
* executeInsertStmtPlan(ctx, executor, plans); */
throw new AnalysisException("Fallback to legacy planner temporary.");
if (!Config.enable_nereids_load) {
throw new AnalysisException("Fallback to legacy planner temporary.");
}
this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile);
profile.getSummaryProfile().setQueryBeginTime();
if (sourceInfos.size() == 1) {
plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
Optional.of(labelName)));
} else {
throw new AnalysisException("Multi insert into statements are unsupported.");
}
profile.getSummaryProfile().setQueryPlanFinishTime();
submitInsertStmtPlan(ctx, executor, plans);
}
private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataDesc)
@ -151,6 +159,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
boolean scanAllTvfCol = (tvfProjects.get(0) instanceof UnboundStar);
OlapTable olapTable = getOlapTable(ctx, dataDesc);
sinkTableNames.add(olapTable.getName());
List<Column> olapSchema = olapTable.getBaseSchema();
// map column index to mapping expr
Map<String, Expression> mappingExpressions = dataDesc.getColumnMappings();
@ -471,20 +480,14 @@ public class LoadCommand extends Command implements ForwardWithSync {
return tvfProperties;
}
private void executeInsertStmtPlan(ConnectContext ctx, StmtExecutor executor, List<InsertIntoTableCommand> plans) {
private void submitInsertStmtPlan(ConnectContext ctx, StmtExecutor executor, List<InsertIntoTableCommand> plans) {
try {
for (LogicalPlan logicalPlan : plans) {
((Command) logicalPlan).run(ctx, executor);
}
} catch (QueryStateException e) {
ctx.setState(e.getQueryState());
throw new NereidsException("Command process failed", new AnalysisException(e.getMessage(), e));
} catch (UserException e) {
// Return message to info client what happened.
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
throw new NereidsException("Command process failed", new AnalysisException(e.getMessage(), e));
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(JobExecuteType.INSTANT);
InsertJob jobExecutor = new InsertJob(ctx, executor, labelName, plans,
sinkTableNames, properties, comment, jobExecutionConfiguration);
Env.getCurrentEnv().getJobManager().registerJob(jobExecutor);
} catch (Exception e) {
// Maybe our bug
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage());
throw new NereidsException("Command process failed.", new AnalysisException(e.getMessage(), e));
}

View File

@ -65,7 +65,6 @@ import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
@ -673,7 +672,7 @@ public class EditLog {
}
case OperationType.OP_DELETE_SCHEDULER_JOB: {
AbstractJob job = (AbstractJob) journal.getData();
Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
Env.getCurrentEnv().getJobManager().replayEndJob(job);
break;
}
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
@ -1347,30 +1346,6 @@ public class EditLog {
logEdit(OperationType.OP_RECOVER_TABLE, info);
}
public void logLoadStart(LoadJob job) {
logEdit(OperationType.OP_LOAD_START, job);
}
public void logLoadEtl(LoadJob job) {
logEdit(OperationType.OP_LOAD_ETL, job);
}
public void logLoadLoading(LoadJob job) {
logEdit(OperationType.OP_LOAD_LOADING, job);
}
public void logLoadQuorum(LoadJob job) {
logEdit(OperationType.OP_LOAD_QUORUM, job);
}
public void logLoadCancel(LoadJob job) {
logEdit(OperationType.OP_LOAD_CANCEL, job);
}
public void logLoadDone(LoadJob job) {
logEdit(OperationType.OP_LOAD_DONE, job);
}
public void logDropRollup(DropInfo info) {
logEdit(OperationType.OP_DROP_ROLLUP, info);
}
@ -1644,7 +1619,7 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
public void logDeleteJob(AbstractJob job) {
public void logEndJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}
@ -1985,6 +1960,7 @@ public class EditLog {
public void logAlterMTMV(AlterMTMV log) {
logEdit(OperationType.OP_ALTER_MTMV, log);
}
public String getNotReadyReason() {

View File

@ -174,7 +174,10 @@ public class DdlExecutor {
} else if (ddlStmt instanceof CancelExportStmt) {
env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
} else if (ddlStmt instanceof CancelLoadStmt) {
env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
CancelLoadStmt cs = (CancelLoadStmt) ddlStmt;
// cancel all
env.getJobManager().cancelLoadJob(cs);
env.getLoadManager().cancelLoadJob(cs);
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof PauseRoutineLoadStmt) {

View File

@ -183,6 +183,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.job.manager.JobManager;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
@ -1169,32 +1170,36 @@ public class ShowExecutor {
Env env = ctx.getEnv();
DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(showStmt.getDbName());
long dbId = db.getId();
List<List<Comparable>> loadInfos;
// combine the List<LoadInfo> of load(v1) and loadManager(v2)
Load load = env.getLoadInstance();
List<List<Comparable>> loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(),
loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(),
showStmt.isAccurateMatch(), showStmt.getStates());
Set<String> statesValue = showStmt.getStates() == null ? null : showStmt.getStates().stream()
.map(entity -> entity.name())
.collect(Collectors.toSet());
loadInfos.addAll(env.getLoadManager()
.getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), showStmt.isAccurateMatch(), statesValue));
// add the nerieds load info
JobManager loadMgr = env.getJobManager();
loadInfos.addAll(loadMgr.getLoadJobInfosByDb(dbId, db.getFullName(), showStmt.getLabelValue(),
showStmt.isAccurateMatch(), showStmt.getStateV2()));
// order the result of List<LoadInfo> by orderByPairs in show stmt
List<OrderByPair> orderByPairs = showStmt.getOrderByPairs();
ListComparator<List<Comparable>> comparator = null;
ListComparator<List<Comparable>> comparator;
if (orderByPairs != null) {
OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
comparator = new ListComparator<List<Comparable>>(orderByPairs.toArray(orderByPairArr));
comparator = new ListComparator<>(orderByPairs.toArray(orderByPairArr));
} else {
// sort by id asc
comparator = new ListComparator<List<Comparable>>(0);
comparator = new ListComparator<>(0);
}
Collections.sort(loadInfos, comparator);
List<List<String>> rows = Lists.newArrayList();
for (List<Comparable> loadInfo : loadInfos) {
List<String> oneInfo = new ArrayList<String>(loadInfo.size());
List<String> oneInfo = new ArrayList<>(loadInfo.size());
// replace QUORUM_FINISHED -> FINISHED
if (loadInfo.get(LoadProcDir.STATE_INDEX).equals(JobState.QUORUM_FINISHED.name())) {

View File

@ -34,5 +34,7 @@ public interface TransientTaskExecutor {
* Cancel the memory task.
*/
void cancel() throws JobException;
Long getId();
}

View File

@ -23,7 +23,6 @@ import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import lombok.Setter;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class TransientTaskManager {
@ -51,8 +50,8 @@ public class TransientTaskManager {
return taskExecutorMap.get(taskId);
}
public Long registerMemoryTask(TransientTaskExecutor executor) {
Long taskId = UUID.randomUUID().getMostSignificantBits();
public Long addMemoryTask(TransientTaskExecutor executor) {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
return taskId;

View File

@ -30,7 +30,7 @@ public class ExportTaskRegister implements TransientTaskRegister {
@Override
public Long registerTask(TransientTaskExecutor executor) {
return transientTaskManager.registerMemoryTask(executor);
return transientTaskManager.addMemoryTask(executor);
}
@Override