diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 04d33fd4cc..c038a9f178 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1931,6 +1931,18 @@ public class Config extends ConfigBase { @ConfField(masterOnly = true) public static boolean enable_hms_events_incremental_sync = false; + /** + * If set to true, doris will try to parse the ddl of a hive view and try to execute the query + * otherwise it will throw an AnalysisException. + */ + @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL, description = { + "当前默认设置为 false,开启后支持使用新优化器的load语句导入数据,失败后会降级旧的load语句。", + "Now default set to true, After this function is enabled, the load statement of " + + "the new optimizer can be used to import data. If this function fails, " + + "the old load statement will be degraded."}) + public static boolean enable_nereids_load = false; + + /** * Maximum number of events to poll in each RPC. */ diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index a2f90ea216..dc202fef74 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -71,11 +71,6 @@ statement (withRemoteStorageSystem)? (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (commentSpec)? #load - | LOAD LABEL lableName=identifier - LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN - resourceDesc - (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? - (commentSpec)? #resourceLoad | LOAD mysqlDataDesc (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (commentSpec)? #mysqlLoad @@ -131,7 +126,7 @@ dataDesc (PARTITION partition=identifierList)? (COLUMNS TERMINATED BY comma=STRING_LITERAL)? (LINES TERMINATED BY separator=STRING_LITERAL)? - (FORMAT AS format=identifier)? + (FORMAT AS format=identifierOrStringLiteral)? (columns=identifierList)? (columnsFromPath=colFromPath)? (columnMapping=colMappingList)? @@ -167,6 +162,11 @@ refreshMethod : COMPLETE | AUTO ; +identifierOrStringLiteral + : identifier + | STRING_LITERAL + ; + identifierOrText : errorCapturingIdentifier | STRING_LITERAL @@ -224,7 +224,8 @@ mappingExpr ; withRemoteStorageSystem - : WITH S3 LEFT_PAREN + : resourceDesc + | WITH S3 LEFT_PAREN brokerProperties=propertyItemList RIGHT_PAREN | WITH HDFS LEFT_PAREN diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index ececccc316..ef76aedba2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -117,10 +117,8 @@ public class CreateJobStmt extends DdlStmt { analyzerSqlStmt(); // check its insert stmt,currently only support insert stmt //todo when support other stmt,need to check stmt type and generate jobInstance - InsertJob job = new InsertJob(); JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); jobExecutionConfiguration.setExecuteType(executeType); - job.setCreateTimeMs(System.currentTimeMillis()); TimerDefinition timerDefinition = new TimerDefinition(); if (null != onceJobStartTimestamp) { @@ -148,17 +146,19 @@ public class CreateJobStmt extends DdlStmt { } checkJobName(labelName.getLabelName()); jobExecutionConfiguration.setTimerDefinition(timerDefinition); - job.setJobConfig(jobExecutionConfiguration); - - job.setComment(comment); - job.setCurrentDbName(labelName.getDbName()); - job.setJobName(labelName.getLabelName()); - job.setCreateUser(ConnectContext.get().getCurrentUserIdentity()); - job.setJobStatus(JobStatus.RUNNING); - job.setJobId(Env.getCurrentEnv().getNextId()); String originStmt = getOrigStmt().originStmt; String executeSql = parseExecuteSql(originStmt); - job.setExecuteSql(executeSql); + // create job use label name as its job name + String jobName = labelName.getLabelName(); + InsertJob job = new InsertJob(jobName, + JobStatus.RUNNING, + labelName.getDbName(), + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + executeSql); + //job.checkJobParams(); jobInstance = job; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java index ae8283f6dd..1e00b5d889 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java @@ -108,6 +108,13 @@ public class ShowLoadStmt extends ShowStmt { return states; } + public org.apache.doris.load.loadv2.JobState getStateV2() { + if (Strings.isNullOrEmpty(stateValue)) { + return null; + } + return org.apache.doris.load.loadv2.JobState.valueOf(stateValue); + } + public boolean isAccurateMatch() { return isAccurateMatch; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 9ae7d4f161..cd77f70f59 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -182,6 +182,7 @@ import org.apache.doris.mtmv.MTMVStatus; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.jobs.load.LabelProcessor; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; @@ -362,6 +363,7 @@ public class Env { private ExportTaskRegister exportTaskRegister; private JobManager, ?> jobManager; + private LabelProcessor labelProcessor; private TransientTaskManager transientTaskManager; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos @@ -641,8 +643,11 @@ public class Env { } this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.jobManager = new JobManager<>(); + this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); + this.transientTaskManager = new TransientTaskManager(); + this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); this.isElectable = false; @@ -3907,6 +3912,10 @@ public class Env { return jobManager; } + public LabelProcessor getLabelProcessor() { + return labelProcessor; + } + public TransientTaskManager getTransientTaskManager() { return transientTaskManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 83f02326d8..6e9cb48da1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -23,6 +23,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.TaskStatus; @@ -76,13 +78,54 @@ public abstract class AbstractJob implements Job runningTasks) { + this.jobId = jobId; + this.jobName = jobName; + this.jobStatus = jobStatus; + this.currentDbName = currentDbName; + this.comment = comment; + this.createUser = createUser; + this.jobConfig = jobConfig; + this.createTimeMs = createTimeMs; + this.executeSql = executeSql; + this.runningTasks = runningTasks; + } private List runningTasks = new ArrayList<>(); @@ -109,6 +152,10 @@ public abstract class AbstractJob implements Job implements Job tasks) { + public void initTasks(Collection tasks, TaskType taskType) { + if (CollectionUtils.isEmpty(getRunningTasks())) { + runningTasks = new ArrayList<>(); + } tasks.forEach(task -> { - task.setJobId(jobId); - task.setTaskId(getNextId()); + task.setTaskType(taskType); + task.setJobId(getJobId()); task.setCreateTimeMs(System.currentTimeMillis()); task.setStatus(TaskStatus.PENDING); }); - if (CollectionUtils.isEmpty(getRunningTasks())) { - setRunningTasks(new ArrayList<>()); - } - getRunningTasks().addAll((Collection) tasks); + getRunningTasks().addAll(tasks); + this.startTimeMs = System.currentTimeMillis(); } public void checkJobParams() { @@ -208,10 +256,22 @@ public abstract class AbstractJob implements Job()); + job.runningTasks = new ArrayList<>(); return job; } + public void logCreateOperation() { + Env.getCurrentEnv().getEditLog().logCreateJob(this); + } + + public void logFinalOperation() { + Env.getCurrentEnv().getEditLog().logEndJob(this); + } + + public void logUpdateOperation() { + Env.getCurrentEnv().getEditLog().logUpdateJob(this); + } + @Override public void onTaskFail(T task) throws JobException { updateJobStatusIfEnd(); @@ -303,7 +363,19 @@ public abstract class AbstractJob implements Job replayJob) throws JobException { + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index ee352a0f41..1124e7f2d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -99,11 +99,34 @@ public interface Job { /** * Cancels all running tasks of this job. - * * @throws JobException If cancelling a running task fails. */ void cancelAllTasks() throws JobException; + /** + * register job + * @throws JobException If register job failed. + */ + void onRegister() throws JobException; + + /** + * register job failed + * @throws JobException If failed. + */ + void onUnRegister() throws JobException; + + /** + * replay create job + * @throws JobException If replay create failed. + */ + void onReplayCreate() throws JobException; + + /** + * replay finished or cancelled job + * @throws JobException If replay end failed. + */ + void onReplayEnd(AbstractJob replayJob) throws JobException; + /** * Notifies the job when a task execution fails. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java index ea9ddb3b02..3529a2efef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java @@ -37,7 +37,7 @@ public enum JobExecuteType { */ MANUAL, /** - * The job will be executed immediately. + * The job will be executed only once and immediately. */ INSTANT, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java index 2df65e4654..22b799225d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java @@ -36,7 +36,6 @@ public enum JobStatus { * The stop state cannot be resumed */ STOPPED, - /** * When the task is finished, the finished state will be triggered. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java index 1beb4e0a38..084a39ddf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java @@ -19,5 +19,5 @@ package org.apache.doris.job.common; public enum JobType { INSERT, - MV + MV, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 74581b8f1b..9256864efc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -17,43 +17,74 @@ package org.apache.doris.job.extensions.insert; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.Config; -import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; -import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.mysql.privilege.Privilege; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.ErrorTabletInfo; +import org.apache.doris.transaction.TabletCommitInfo; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +@EqualsAndHashCode(callSuper = true) @Data @Slf4j -public class InsertJob extends AbstractJob { +public class InsertJob extends AbstractJob> { public static final ImmutableList SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createStringType()), @@ -66,40 +97,145 @@ public class InsertJob extends AbstractJob { new Column("CreateTime", ScalarType.createStringType()), new Column("Comment", ScalarType.createStringType())); + private static final ShowResultSetMetaData TASK_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("TaskId", ScalarType.createVarchar(80))) + .addColumn(new Column("Label", ScalarType.createVarchar(80))) + .addColumn(new Column("Status", ScalarType.createVarchar(20))) + .addColumn(new Column("EtlInfo", ScalarType.createVarchar(100))) + .addColumn(new Column("TaskInfo", ScalarType.createVarchar(100))) + .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(100))) + + .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200))) + .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200))) + .addColumn(new Column("User", ScalarType.createVarchar(50))) + .build(); + public static final ImmutableMap COLUMN_TO_INDEX; static { - ImmutableMap.Builder builder = new ImmutableMap.Builder(); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (int i = 0; i < SCHEMA.size(); i++) { builder.put(SCHEMA.get(i).getName().toLowerCase(), i); } COLUMN_TO_INDEX = builder.build(); } - @SerializedName(value = "lp") - String labelPrefix; - - InsertIntoTableCommand command; - - StmtExecutor stmtExecutor; - - ConnectContext ctx; - @SerializedName("tis") ConcurrentLinkedQueue historyTaskIdList; + @SerializedName("did") + private final long dbId; + @SerializedName("ln") + private String labelName; + @SerializedName("lt") + private InsertJob.LoadType loadType; + // 0: the job status is pending + // n/100: n is the number of task which has been finished + // 99: all tasks have been finished + // 100: txn status is visible and load has been finished + @SerializedName("pg") + private int progress; + @SerializedName("fm") + private FailMsg failMsg; + @SerializedName("plans") + private List plans = new ArrayList<>(); + private LoadStatistic loadStatistic = new LoadStatistic(); + private Set finishedTaskIds = new HashSet<>(); + private ConcurrentHashMap idToTasks = new ConcurrentHashMap<>(); + private Map properties = new HashMap<>(); + private Set tableNames; + private AuthorizationInfo authorizationInfo; + + private ConnectContext ctx; + private StmtExecutor stmtExecutor; + private List errorTabletInfos = new ArrayList<>(); + private List commitInfos = new ArrayList<>(); + + // max save task num, do we need to config it? + private static final int MAX_SAVE_TASK_NUM = 100; + + /** + * load job type + */ + public enum LoadType { + BULK, + SPARK, + LOCAL_FILE, + UNKNOWN + + } + + public enum Priority { + HIGH(0), + NORMAL(1), + LOW(2); + + Priority(int value) { + this.value = value; + } + + private final int value; + + public int getValue() { + return value; + } + } + + public InsertJob(String jobName, + JobStatus jobStatus, + String dbName, + String comment, + UserIdentity createUser, + JobExecutionConfiguration jobConfig, + Long createTimeMs, + String executeSql) { + super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser, + jobConfig, createTimeMs, executeSql, null); + this.dbId = ConnectContext.get().getCurrentDbId(); + } + + public InsertJob(ConnectContext ctx, + StmtExecutor executor, + String labelName, + List plans, + Set sinkTableNames, + Map properties, + String comment, + JobExecutionConfiguration jobConfig) { + super(getNextJobId(), labelName, JobStatus.RUNNING, null, + comment, ctx.getCurrentUserIdentity(), jobConfig); + this.ctx = ctx; + this.plans = plans; + this.stmtExecutor = executor; + this.dbId = ctx.getCurrentDbId(); + this.labelName = labelName; + this.tableNames = sinkTableNames; + this.properties = properties; + // TODO: not support other type yet + this.loadType = InsertJob.LoadType.BULK; + } @Override - public List createTasks(TaskType taskType, Map taskContext) { - //nothing need to do in insert job - InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser()); - task.setJobId(getJobId()); - task.setTaskType(taskType); - task.setTaskId(Env.getCurrentEnv().getNextId()); - ArrayList tasks = new ArrayList<>(); - tasks.add(task); - super.initTasks(tasks); - recordTask(task.getTaskId()); - return tasks; + public List createTasks(TaskType taskType, Map taskContext) { + if (plans.isEmpty()) { + InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); + idToTasks.put(task.getTaskId(), task); + recordTask(task.getTaskId()); + } else { + // use for load stmt + for (InsertIntoTableCommand logicalPlan : plans) { + if (!logicalPlan.getLabelName().isPresent()) { + throw new IllegalArgumentException("Load plan need label name."); + } + InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic); + idToTasks.put(task.getTaskId(), task); + recordTask(task.getTaskId()); + } + } + initTasks(idToTasks.values(), taskType); + return new ArrayList<>(idToTasks.values()); } public void recordTask(long id) { @@ -116,7 +252,6 @@ public class InsertJob extends AbstractJob { if (historyTaskIdList.size() >= Config.max_persistence_task_count) { historyTaskIdList.poll(); } - Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @Override @@ -125,23 +260,27 @@ public class InsertJob extends AbstractJob { } @Override - public boolean isReadyForScheduling(Map taskContext) { + public void cancelAllTasks() throws JobException { + try { + checkAuth("CANCEL LOAD"); + super.cancelAllTasks(); + this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); + } catch (DdlException e) { + throw new JobException(e); + } + } + + @Override + public boolean isReadyForScheduling(Map taskContext) { return CollectionUtils.isEmpty(getRunningTasks()); } - - @Override - public void cancelAllTasks() throws JobException { - super.cancelAllTasks(); - } - - @Override protected void checkJobParamsInternal() { - if (command == null && StringUtils.isBlank(getExecuteSql())) { + if (plans.isEmpty() && StringUtils.isBlank(getExecuteSql())) { throw new IllegalArgumentException("command or sql is null,must be set"); } - if (null != command && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + if (!plans.isEmpty() && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { throw new IllegalArgumentException("command must be null when executeType is not instant"); } } @@ -153,27 +292,22 @@ public class InsertJob extends AbstractJob { } //TODO it's will be refactor, we will storage task info in job inner and query from it List taskIdList = new ArrayList<>(this.historyTaskIdList); + Collections.reverse(taskIdList); - List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); - if (CollectionUtils.isEmpty(loadJobs)) { + return queryLoadTasksByTaskIds(taskIdList); + } + + public List queryLoadTasksByTaskIds(List taskIdList) { + if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List tasks = new ArrayList<>(); - loadJobs.forEach(loadJob -> { - InsertTask task; - try { - task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); - task.setCreateTimeMs(loadJob.getCreateTimestamp()); - } catch (MetaNotFoundException e) { - log.warn("load job not found, job id is {}", loadJob.getId()); - return; + List jobs = new ArrayList<>(); + taskIdList.forEach(id -> { + if (null != idToTasks.get(id)) { + jobs.add(idToTasks.get(id)); } - task.setJobId(getJobId()); - task.setTaskId(loadJob.getId()); - task.setLoadJob(loadJob); - tasks.add(task); }); - return tasks; + return jobs; } @Override @@ -193,6 +327,12 @@ public class InsertJob extends AbstractJob { @Override public void onTaskFail(InsertTask task) { + try { + updateJobStatus(JobStatus.STOPPED); + this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg()); + } catch (JobException e) { + throw new RuntimeException(e); + } getRunningTasks().remove(task); } @@ -203,7 +343,129 @@ public class InsertJob extends AbstractJob { @Override public List getShowInfo() { - return super.getCommonShowInfo(); + try { + // check auth + checkAuth("SHOW LOAD"); + List jobInfo = Lists.newArrayList(); + // jobId + jobInfo.add(getJobId().toString()); + // label + if (StringUtils.isEmpty(getLabelName())) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(getLabelName()); + } + // state + if (getJobStatus() == JobStatus.STOPPED) { + jobInfo.add("CANCELLED"); + } else { + jobInfo.add(getJobStatus().name()); + } + + // progress + String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId())); + switch (getJobStatus()) { + case RUNNING: + if (isPending()) { + jobInfo.add("ETL:0%; LOAD:0%"); + } else { + jobInfo.add("ETL:100%; LOAD:" + progress + "%"); + } + break; + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case STOPPED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + // type + jobInfo.add(loadType.name()); + + // etl info + if (loadStatistic.getCounters().size() == 0) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadStatistic.getCounters())); + } + + // task info + jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout() + + "; max_filter_ratio:" + getMaxFilterRatio() + "; priority:" + getPriority()); + // error msg + if (failMsg == null) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs())); + // tracking urls + List trackingUrl = idToTasks.values().stream() + .map(task -> { + if (StringUtils.isNotEmpty(task.getTrackingUrl())) { + return task.getTrackingUrl(); + } else { + return FeConstants.null_string; + } + }) + .collect(Collectors.toList()); + if (trackingUrl.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(trackingUrl.toString()); + } + // job details + jobInfo.add(loadStatistic.toJson()); + // transaction id + jobInfo.add(String.valueOf(0)); + // error tablets + jobInfo.add(errorTabletsToJson()); + // user, some load job may not have user info + if (getCreateUser() == null || getCreateUser().getQualifiedUser() == null) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(getCreateUser().getQualifiedUser()); + } + // comment + jobInfo.add(getComment()); + return jobInfo; + } catch (DdlException e) { + throw new RuntimeException(e); + } + } + + private String getPriority() { + return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name()); + } + + public double getMaxFilterRatio() { + return Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.0")); + } + + public long getTimeout() { + if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + } + return Config.broker_load_default_timeout_second; + } + + + public static InsertJob readFields(DataInput in) throws IOException { + String jsonJob = Text.readString(in); + InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class); + job.setRunningTasks(new ArrayList<>()); + return job; } @Override @@ -211,19 +473,129 @@ public class InsertJob extends AbstractJob { Text.writeString(out, GsonUtils.GSON.toJson(this)); } - private static final ShowResultSetMetaData TASK_META_DATA = - ShowResultSetMetaData.builder() - .addColumn(new Column("TaskId", ScalarType.createVarchar(20))) - .addColumn(new Column("Label", ScalarType.createVarchar(20))) - .addColumn(new Column("Status", ScalarType.createVarchar(20))) - .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20))) - .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20))) - .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20))) + public String errorTabletsToJson() { + Map map = new HashMap<>(); + errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load) + .forEach(p -> map.put(p.getTabletId(), p.getMsg())); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(map); + } - .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) - .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) - .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20))) - .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20))) - .addColumn(new Column("User", ScalarType.createVarchar(20))) - .build(); + public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + long scannedBytes, boolean isDone) { + loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); + progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100); + if (progress == 100) { + progress = 99; + } + } + + private void checkAuth(String command) throws DdlException { + if (authorizationInfo == null) { + // use the old method to check priv + checkAuthWithoutAuthInfo(command); + return; + } + if (!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(), authorizationInfo, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, + Privilege.LOAD_PRIV); + } + } + + /** + * This method is compatible with old load job without authorization info + * If db or table name could not be found by id, it will throw the NOT_EXISTS_ERROR + * + * @throws DdlException + */ + private void checkAuthWithoutAuthInfo(String command) throws DdlException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); + // check auth + if (tableNames == null || tableNames.isEmpty()) { + // forward compatibility + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), db.getFullName(), + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, + Privilege.LOAD_PRIV); + } + } else { + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), db.getFullName(), + tblName, PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, + command, + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), db.getFullName() + ": " + tblName); + } + } + } + } + + public void unprotectReadEndOperation(InsertJob replayLog) { + setJobStatus(replayLog.getJobStatus()); + progress = replayLog.getProgress(); + setStartTimeMs(replayLog.getStartTimeMs()); + setFinishTimeMs(replayLog.getFinishTimeMs()); + failMsg = replayLog.getFailMsg(); + } + + public String getResourceName() { + // TODO: get tvf param from tvf relation + return "N/A"; + } + + public boolean isRunning() { + return getJobStatus() != JobStatus.FINISHED; + } + + public boolean isPending() { + return getJobStatus() != JobStatus.FINISHED; + } + + public boolean isCancelled() { + return getJobStatus() == JobStatus.STOPPED; + } + + @Override + public void onRegister() throws JobException { + try { + if (StringUtils.isNotEmpty(labelName)) { + Env.getCurrentEnv().getLabelProcessor().addJob(this); + } + } catch (LabelAlreadyUsedException e) { + throw new JobException(e); + } + } + + @Override + public void onUnRegister() throws JobException { + // TODO: need record cancelled jobs in order to show cancelled job + // Env.getCurrentEnv().getLabelProcessor().removeJob(getDbId(), getLabelName()); + } + + @Override + public void onReplayCreate() throws JobException { + JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); + jobConfig.setExecuteType(JobExecuteType.INSTANT); + setJobConfig(jobConfig); + onRegister(); + checkJobParams(); + log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build()); + } + + @Override + public void onReplayEnd(AbstractJob> replayJob) throws JobException { + if (!(replayJob instanceof InsertJob)) { + return; + } + InsertJob insertJob = (InsertJob) replayJob; + unprotectReadEndOperation(insertJob); + log.info(new LogBuilder(LogKey.LOAD_JOB, + insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build()); + } + + public int getProgress() { + return progress; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 7e98895026..e85e7a1b02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -25,7 +25,8 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.qe.ConnectContext; @@ -34,12 +35,12 @@ import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; import java.util.Optional; import java.util.UUID; @@ -53,8 +54,6 @@ public class InsertTask extends AbstractTask { new Column("JobId", ScalarType.createStringType()), new Column("Label", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), - new Column("EtlInfo", ScalarType.createStringType()), - new Column("TaskInfo", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTimeMs", ScalarType.createStringType()), new Column("FinishTimeMs", ScalarType.createStringType()), @@ -73,30 +72,69 @@ public class InsertTask extends AbstractTask { } private String labelName; - private InsertIntoTableCommand command; - private StmtExecutor stmtExecutor; - private ConnectContext ctx; - private String sql; - private String currentDb; - private UserIdentity userIdentity; - + private LoadStatistic loadStatistic; private AtomicBoolean isCanceled = new AtomicBoolean(false); - private AtomicBoolean isFinished = new AtomicBoolean(false); - private static final String LABEL_SPLITTER = "_"; + private FailMsg failMsg; + @Getter + private String trackingUrl; @Getter @Setter - private LoadJob loadJob; + private InsertJob jobInfo; + private TaskType taskType = TaskType.PENDING; + private MergeType mergeType = MergeType.APPEND; + /** + * task merge type + */ + enum MergeType { + MERGE, + APPEND, + DELETE + } + + /** + * task type + */ + enum TaskType { + UNKNOWN, // this is only for ISSUE #2354 + PENDING, + LOADING, + FINISHED, + FAILED, + CANCELLED + } + + public InsertTask(InsertIntoTableCommand insertInto, + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + this(insertInto.getLabelName().get(), insertInto, ctx, executor, statistic); + } + + public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) { + this.labelName = labelName; + this.sql = sql; + this.currentDb = currentDb; + this.userIdentity = userIdentity; + } + + public InsertTask(String labelName, InsertIntoTableCommand insertInto, + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + this.labelName = labelName; + this.command = insertInto; + this.userIdentity = ctx.getCurrentUserIdentity(); + this.ctx = ctx; + this.stmtExecutor = executor; + this.loadStatistic = statistic; + } @Override public void before() throws JobException { @@ -109,15 +147,19 @@ public class InsertTask extends AbstractTask { ctx.setCurrentUserIdentity(userIdentity); ctx.getState().reset(); ctx.setThreadLocalInfo(); - ctx.setDatabase(currentDb); + if (StringUtils.isNotEmpty(currentDb)) { + ctx.setDatabase(currentDb); + } TUniqueId queryId = generateQueryId(UUID.randomUUID().toString()); ctx.getSessionVariable().enableFallbackToOriginalPlanner = false; stmtExecutor = new StmtExecutor(ctx, (String) null); ctx.setQueryId(queryId); - NereidsParser parser = new NereidsParser(); - this.command = (InsertIntoTableCommand) parser.parseSingle(sql); - this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); - this.command.setJobId(getTaskId()); + if (StringUtils.isNotEmpty(sql)) { + NereidsParser parser = new NereidsParser(); + this.command = (InsertIntoTableCommand) parser.parseSingle(sql); + this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); + this.command.setJobId(getTaskId()); + } super.before(); @@ -128,14 +170,6 @@ public class InsertTask extends AbstractTask { return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); } - public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) { - this.labelName = labelName; - this.sql = sql; - this.currentDb = currentDb; - this.userIdentity = userIdentity; - - } - @Override public void run() throws JobException { try { @@ -143,7 +177,7 @@ public class InsertTask extends AbstractTask { log.info("task has been canceled, task id is {}", getTaskId()); return; } - command.run(ctx, stmtExecutor); + command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic); } catch (Exception e) { log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(), getTaskId(), sql, e); @@ -178,42 +212,28 @@ public class InsertTask extends AbstractTask { @Override public TRow getTvfInfo() { TRow trow = new TRow(); - if (loadJob == null) { + if (jobInfo == null) { // if task not start, load job is null,return pending task show info return getPendingTaskTVFInfo(); } - trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name())); - // etl info - String etlInfo = FeConstants.null_string; - if (!loadJob.getLoadingStatus().getCounters().isEmpty()) { - etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters()); - } - trow.addToColumnValue(new TCell().setStringVal(etlInfo)); - - // task info - String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority(); - trow.addToColumnValue(new TCell().setStringVal(taskInfo)); - + trow.addToColumnValue(new TCell().setStringVal(labelName)); + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name())); // err msg String errMsg = FeConstants.null_string; - if (loadJob.getFailMsg() != null) { - errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg(); + if (failMsg != null) { + errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); } trow.addToColumnValue(new TCell().setStringVal(errMsg)); - // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp()))); - + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs()))); // tracking url - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } @@ -225,8 +245,6 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index c500e69329..669a2806a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -125,7 +125,7 @@ public class MTMVJob extends AbstractJob { task.setTaskType(taskType); ArrayList tasks = new ArrayList<>(); tasks.add(task); - super.initTasks(tasks); + super.initTasks(tasks, taskType); return tasks; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 776af152c6..814d6b773a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -17,7 +17,15 @@ package org.apache.doris.job.manager; +import org.apache.doris.analysis.CancelLoadStmt; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -26,44 +34,85 @@ import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.scheduler.JobScheduler; import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.load.loadv2.JobState; +import com.google.common.collect.Lists; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @Log4j2 public class JobManager, C> implements Writable { - private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(32); - private JobScheduler jobScheduler; + private JobScheduler jobScheduler; + + // lock for job + // lock is private and must use after db lock + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } public void start() { - jobScheduler = new JobScheduler(jobMap); + jobScheduler = new JobScheduler(jobMap); jobScheduler.start(); } - public void registerJob(T job) throws JobException { - job.checkJobParams(); - checkJobNameExist(job.getJobName()); - if (jobMap.get(job.getJobId()) != null) { - throw new JobException("job id exist, jobId:" + job.getJobId()); - } - Env.getCurrentEnv().getEditLog().logCreateJob(job); - jobMap.put(job.getJobId(), job); - //check its need to scheduler - jobScheduler.scheduleOneJob(job); + + /** + * get running job + * + * @param jobId id + * @return running job + */ + public T getJob(long jobId) { + return jobMap.get(jobId); } + public void registerJob(T job) throws JobException { + writeLock(); + try { + job.onRegister(); + job.checkJobParams(); + checkJobNameExist(job.getJobName()); + if (jobMap.get(job.getJobId()) != null) { + throw new JobException("job id exist, jobId:" + job.getJobId()); + } + jobMap.put(job.getJobId(), job); + //check its need to scheduler + jobScheduler.scheduleOneJob(job); + job.logCreateOperation(); + } finally { + writeUnlock(); + } + } private void checkJobNameExist(String jobName) throws JobException { if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) { @@ -72,11 +121,17 @@ public class JobManager, C> implements Writable { } public void unregisterJob(Long jobId) throws JobException { - checkJobExist(jobId); - jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); - jobMap.get(jobId).cancelAllTasks(); - Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId)); - jobMap.remove(jobId); + writeLock(); + try { + checkJobExist(jobId); + jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); + jobMap.get(jobId).cancelAllTasks(); + jobMap.get(jobId).logFinalOperation(); + jobMap.get(jobId).onUnRegister(); + jobMap.remove(jobId); + } finally { + writeUnlock(); + } } public void unregisterJob(String jobName) throws JobException { @@ -95,7 +150,7 @@ public class JobManager, C> implements Writable { public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); - Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId)); + jobMap.get(jobId).logUpdateOperation(); } public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { @@ -169,13 +224,12 @@ public class JobManager, C> implements Writable { jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context); } - public void replayCreateJob(T job) { + public void replayCreateJob(T job) throws JobException { if (jobMap.containsKey(job.getJobId())) { return; } jobMap.putIfAbsent(job.getJobId(), job); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay create scheduler job").build()); + job.onReplayCreate(); } /** @@ -187,13 +241,12 @@ public class JobManager, C> implements Writable { .add("msg", "replay update scheduler job").build()); } - public void replayDeleteJob(T job) { - if (null == jobMap.get(job.getJobId())) { + public void replayEndJob(T replayJob) throws JobException { + T job = jobMap.get(replayJob.getJobId()); + if (null == job) { return; } - jobMap.remove(job.getJobId()); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay delete scheduler job").build()); + job.onReplayEnd(replayJob); } public void cancelTaskById(String jobName, Long taskId) throws JobException { @@ -236,4 +289,135 @@ public class JobManager, C> implements Writable { return jobMap.get(jobId); } + + /** + * get load info by db + * + * @param dbId db id + * @param dbName db name + * @param labelValue label name + * @param accurateMatch accurate match + * @param jobState state + * @return load infos + * @throws AnalysisException ex + */ + public List> getLoadJobInfosByDb(long dbId, String dbName, + String labelValue, + boolean accurateMatch, + JobState jobState) throws AnalysisException { + LinkedList> loadJobInfos = new LinkedList<>(); + if (!Env.getCurrentEnv().getLabelProcessor().existJobs(dbId)) { + return loadJobInfos; + } + readLock(); + try { + List loadJobList = Env.getCurrentEnv().getLabelProcessor() + .filterJobs(dbId, labelValue, accurateMatch); + // check state + for (InsertJob loadJob : loadJobList) { + try { + if (jobState != null && !validState(jobState, loadJob)) { + continue; + } + // add load job info, convert String list to Comparable list + loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo())); + } catch (RuntimeException e) { + // ignore this load job + log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e); + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + + private static boolean validState(JobState jobState, InsertJob loadJob) { + JobStatus status = loadJob.getJobStatus(); + switch (status) { + case RUNNING: + return jobState == JobState.PENDING || jobState == JobState.ETL + || jobState == JobState.LOADING || jobState == JobState.COMMITTED; + case STOPPED: + return jobState == JobState.CANCELLED; + case FINISHED: + return jobState == JobState.FINISHED; + default: + return false; + } + } + + public void cancelLoadJob(CancelLoadStmt cs) + throws JobException, AnalysisException, DdlException { + String dbName = cs.getDbName(); + String label = cs.getLabel(); + String state = cs.getState(); + CompoundPredicate.Operator operator = cs.getOperator(); + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); + // List of load jobs waiting to be cancelled + List unfinishedLoadJob; + readLock(); + try { + List loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db); + List matchLoadJobs = Lists.newArrayList(); + addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs); + if (matchLoadJobs.isEmpty()) { + throw new JobException("Load job does not exist"); + } + // check state here + unfinishedLoadJob = + matchLoadJobs.stream().filter(InsertJob::isRunning) + .collect(Collectors.toList()); + if (unfinishedLoadJob.isEmpty()) { + throw new JobException("There is no uncompleted job"); + } + } finally { + readUnlock(); + } + for (InsertJob loadJob : unfinishedLoadJob) { + try { + unregisterJob(loadJob.getJobId()); + } catch (JobException e) { + log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName()); + } + } + } + + private static void addNeedCancelLoadJob(String label, String state, + CompoundPredicate.Operator operator, List loadJobs, + List matchLoadJobs) + throws AnalysisException { + PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label, + CaseSensibility.LABEL.getCaseSensibility()); + matchLoadJobs.addAll( + loadJobs.stream() + .filter(job -> !job.isCancelled()) + .filter(job -> { + if (operator != null) { + // compound + boolean labelFilter = + label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); + return CompoundPredicate.Operator.AND.equals(operator) ? labelFilter && stateFilter : + labelFilter || stateFilter; + } + if (StringUtils.isNotEmpty(label)) { + return label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + return job.getJobStatus().name().equalsIgnoreCase(state); + } + return false; + }).collect(Collectors.toList()) + ); + } + // public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + // long scannedBytes, boolean isDone) { + // AbstractJob job = jobMap.get(jobId); + // if (job != null) { + // job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); + // } + // } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 7327183e95..71f6ff1c4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException; import com.google.gson.annotations.SerializedName; import lombok.Data; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.RandomUtils; @Data @Log4j2 @@ -52,6 +53,15 @@ public abstract class AbstractTask implements Task { @SerializedName(value = "emg") private String errMsg; + public AbstractTask() { + taskId = getNextTaskId(); + } + + private static long getNextTaskId() { + // do not use Env.getNextId(), just generate id without logging + return System.nanoTime() + RandomUtils.nextInt(); + } + @Override public void onFail(String msg) throws JobException { status = TaskStatus.FAILED; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 996c3ccbb3..fc2f6fca9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -73,6 +73,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Preconditions; @@ -198,7 +199,7 @@ public class ExportJob implements Writable { private List jobExecutorList; - private ConcurrentHashMap taskIdToExecutor = new ConcurrentHashMap<>(); + private ConcurrentHashMap taskIdToExecutor = new ConcurrentHashMap<>(); private Integer finishedTaskCount = 0; private List> allOutfileInfo = Lists.newArrayList(); @@ -380,6 +381,10 @@ public class ExportJob implements Writable { return statementBase; } + public List getTaskExecutors() { + return jobExecutorList; + } + private void generateExportJobExecutor() { jobExecutorList = Lists.newArrayList(); for (List selectStmts : selectStmtListPerParallel) { @@ -607,7 +612,7 @@ public class ExportJob implements Writable { // we need cancel all task taskIdToExecutor.keySet().forEach(id -> { try { - Env.getCurrentEnv().getExportTaskRegister().cancelTask(id); + Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id); } catch (JobException e) { LOG.warn("cancel export task {} exception: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index ae7a175b89..90be7f9f10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -59,23 +59,26 @@ import java.util.stream.Collectors; public class ExportMgr { private static final Logger LOG = LogManager.getLogger(ExportJob.class); - - // lock for export job - // lock is private and must use after db lock - private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private Map exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob // dbid ->