From 509cfea99ad55fa9d45fe7237fe2cbe5aa9d5a1a Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Tue, 26 Dec 2023 12:29:05 +0800 Subject: [PATCH] [feature](Load)(step2)support nereids load job schedule (#26356) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We will Integrate new load job manager into new job scheduling framework so that the insert into task can be scheduled after the broker load sql is converted to insert into TVF(table value function) sql. issue: https://github.com/apache/doris/issues/24221 Now support: 1. load data by tvf insert into sql, but just for simple load(columns need to be defined in the table) 2. show load stmt - job id, label name, job state, time info - simple progress 3. cancel load from db 4. support that enable new load through Config.enable_nereids_load 5. can replay job after restarting doris TODO: - support partition insert job - support show statistics from BE - support multiple task and collect task statistic - support transactional task - need add ut case --- .../java/org/apache/doris/common/Config.java | 12 + .../org/apache/doris/nereids/DorisParser.g4 | 15 +- .../apache/doris/analysis/CreateJobStmt.java | 22 +- .../apache/doris/analysis/ShowLoadStmt.java | 7 + .../java/org/apache/doris/catalog/Env.java | 9 + .../apache/doris/job/base/AbstractJob.java | 98 +++- .../java/org/apache/doris/job/base/Job.java | 25 +- .../apache/doris/job/base/JobExecuteType.java | 2 +- .../apache/doris/job/common/JobStatus.java | 1 - .../org/apache/doris/job/common/JobType.java | 2 +- .../job/extensions/insert/InsertJob.java | 504 +++++++++++++++--- .../job/extensions/insert/InsertTask.java | 128 +++-- .../doris/job/extensions/mtmv/MTMVJob.java | 2 +- .../apache/doris/job/manager/JobManager.java | 238 ++++++++- .../apache/doris/job/task/AbstractTask.java | 10 + .../java/org/apache/doris/load/ExportJob.java | 9 +- .../java/org/apache/doris/load/ExportMgr.java | 26 +- .../apache/doris/load/ExportTaskExecutor.java | 6 + .../org/apache/doris/load/loadv2/LoadJob.java | 105 ---- .../apache/doris/load/loadv2/LoadManager.java | 8 +- .../doris/load/loadv2/LoadStatistic.java | 142 +++++ .../nereids/jobs/load/LabelProcessor.java | 181 +++++++ .../nereids/parser/LogicalPlanBuilder.java | 14 +- .../trees/plans/commands/InsertExecutor.java | 13 +- .../commands/InsertIntoTableCommand.java | 15 + .../trees/plans/commands/LoadCommand.java | 51 +- .../org/apache/doris/persist/EditLog.java | 30 +- .../java/org/apache/doris/qe/DdlExecutor.java | 5 +- .../org/apache/doris/qe/ShowExecutor.java | 17 +- .../executor/TransientTaskExecutor.java | 2 + .../manager/TransientTaskManager.java | 5 +- .../registry/ExportTaskRegister.java | 2 +- .../suites/job_p0/test_base_insert_job.groovy | 2 +- 33 files changed, 1329 insertions(+), 379 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 04d33fd4cc..c038a9f178 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1931,6 +1931,18 @@ public class Config extends ConfigBase { @ConfField(masterOnly = true) public static boolean enable_hms_events_incremental_sync = false; + /** + * If set to true, doris will try to parse the ddl of a hive view and try to execute the query + * otherwise it will throw an AnalysisException. + */ + @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL, description = { + "当前默认设置为 false,开启后支持使用新优化器的load语句导入数据,失败后会降级旧的load语句。", + "Now default set to true, After this function is enabled, the load statement of " + + "the new optimizer can be used to import data. If this function fails, " + + "the old load statement will be degraded."}) + public static boolean enable_nereids_load = false; + + /** * Maximum number of events to poll in each RPC. */ diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index a2f90ea216..dc202fef74 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -71,11 +71,6 @@ statement (withRemoteStorageSystem)? (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (commentSpec)? #load - | LOAD LABEL lableName=identifier - LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN - resourceDesc - (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? - (commentSpec)? #resourceLoad | LOAD mysqlDataDesc (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (commentSpec)? #mysqlLoad @@ -131,7 +126,7 @@ dataDesc (PARTITION partition=identifierList)? (COLUMNS TERMINATED BY comma=STRING_LITERAL)? (LINES TERMINATED BY separator=STRING_LITERAL)? - (FORMAT AS format=identifier)? + (FORMAT AS format=identifierOrStringLiteral)? (columns=identifierList)? (columnsFromPath=colFromPath)? (columnMapping=colMappingList)? @@ -167,6 +162,11 @@ refreshMethod : COMPLETE | AUTO ; +identifierOrStringLiteral + : identifier + | STRING_LITERAL + ; + identifierOrText : errorCapturingIdentifier | STRING_LITERAL @@ -224,7 +224,8 @@ mappingExpr ; withRemoteStorageSystem - : WITH S3 LEFT_PAREN + : resourceDesc + | WITH S3 LEFT_PAREN brokerProperties=propertyItemList RIGHT_PAREN | WITH HDFS LEFT_PAREN diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index ececccc316..ef76aedba2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -117,10 +117,8 @@ public class CreateJobStmt extends DdlStmt { analyzerSqlStmt(); // check its insert stmt,currently only support insert stmt //todo when support other stmt,need to check stmt type and generate jobInstance - InsertJob job = new InsertJob(); JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); jobExecutionConfiguration.setExecuteType(executeType); - job.setCreateTimeMs(System.currentTimeMillis()); TimerDefinition timerDefinition = new TimerDefinition(); if (null != onceJobStartTimestamp) { @@ -148,17 +146,19 @@ public class CreateJobStmt extends DdlStmt { } checkJobName(labelName.getLabelName()); jobExecutionConfiguration.setTimerDefinition(timerDefinition); - job.setJobConfig(jobExecutionConfiguration); - - job.setComment(comment); - job.setCurrentDbName(labelName.getDbName()); - job.setJobName(labelName.getLabelName()); - job.setCreateUser(ConnectContext.get().getCurrentUserIdentity()); - job.setJobStatus(JobStatus.RUNNING); - job.setJobId(Env.getCurrentEnv().getNextId()); String originStmt = getOrigStmt().originStmt; String executeSql = parseExecuteSql(originStmt); - job.setExecuteSql(executeSql); + // create job use label name as its job name + String jobName = labelName.getLabelName(); + InsertJob job = new InsertJob(jobName, + JobStatus.RUNNING, + labelName.getDbName(), + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + executeSql); + //job.checkJobParams(); jobInstance = job; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java index ae8283f6dd..1e00b5d889 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java @@ -108,6 +108,13 @@ public class ShowLoadStmt extends ShowStmt { return states; } + public org.apache.doris.load.loadv2.JobState getStateV2() { + if (Strings.isNullOrEmpty(stateValue)) { + return null; + } + return org.apache.doris.load.loadv2.JobState.valueOf(stateValue); + } + public boolean isAccurateMatch() { return isAccurateMatch; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 9ae7d4f161..cd77f70f59 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -182,6 +182,7 @@ import org.apache.doris.mtmv.MTMVStatus; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.jobs.load.LabelProcessor; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; @@ -362,6 +363,7 @@ public class Env { private ExportTaskRegister exportTaskRegister; private JobManager, ?> jobManager; + private LabelProcessor labelProcessor; private TransientTaskManager transientTaskManager; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos @@ -641,8 +643,11 @@ public class Env { } this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.jobManager = new JobManager<>(); + this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); + this.transientTaskManager = new TransientTaskManager(); + this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); this.isElectable = false; @@ -3907,6 +3912,10 @@ public class Env { return jobManager; } + public LabelProcessor getLabelProcessor() { + return labelProcessor; + } + public TransientTaskManager getTransientTaskManager() { return transientTaskManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 83f02326d8..6e9cb48da1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -23,6 +23,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.TaskStatus; @@ -76,13 +78,54 @@ public abstract class AbstractJob implements Job runningTasks) { + this.jobId = jobId; + this.jobName = jobName; + this.jobStatus = jobStatus; + this.currentDbName = currentDbName; + this.comment = comment; + this.createUser = createUser; + this.jobConfig = jobConfig; + this.createTimeMs = createTimeMs; + this.executeSql = executeSql; + this.runningTasks = runningTasks; + } private List runningTasks = new ArrayList<>(); @@ -109,6 +152,10 @@ public abstract class AbstractJob implements Job implements Job tasks) { + public void initTasks(Collection tasks, TaskType taskType) { + if (CollectionUtils.isEmpty(getRunningTasks())) { + runningTasks = new ArrayList<>(); + } tasks.forEach(task -> { - task.setJobId(jobId); - task.setTaskId(getNextId()); + task.setTaskType(taskType); + task.setJobId(getJobId()); task.setCreateTimeMs(System.currentTimeMillis()); task.setStatus(TaskStatus.PENDING); }); - if (CollectionUtils.isEmpty(getRunningTasks())) { - setRunningTasks(new ArrayList<>()); - } - getRunningTasks().addAll((Collection) tasks); + getRunningTasks().addAll(tasks); + this.startTimeMs = System.currentTimeMillis(); } public void checkJobParams() { @@ -208,10 +256,22 @@ public abstract class AbstractJob implements Job()); + job.runningTasks = new ArrayList<>(); return job; } + public void logCreateOperation() { + Env.getCurrentEnv().getEditLog().logCreateJob(this); + } + + public void logFinalOperation() { + Env.getCurrentEnv().getEditLog().logEndJob(this); + } + + public void logUpdateOperation() { + Env.getCurrentEnv().getEditLog().logUpdateJob(this); + } + @Override public void onTaskFail(T task) throws JobException { updateJobStatusIfEnd(); @@ -303,7 +363,19 @@ public abstract class AbstractJob implements Job replayJob) throws JobException { + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index ee352a0f41..1124e7f2d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -99,11 +99,34 @@ public interface Job { /** * Cancels all running tasks of this job. - * * @throws JobException If cancelling a running task fails. */ void cancelAllTasks() throws JobException; + /** + * register job + * @throws JobException If register job failed. + */ + void onRegister() throws JobException; + + /** + * register job failed + * @throws JobException If failed. + */ + void onUnRegister() throws JobException; + + /** + * replay create job + * @throws JobException If replay create failed. + */ + void onReplayCreate() throws JobException; + + /** + * replay finished or cancelled job + * @throws JobException If replay end failed. + */ + void onReplayEnd(AbstractJob replayJob) throws JobException; + /** * Notifies the job when a task execution fails. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java index ea9ddb3b02..3529a2efef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java @@ -37,7 +37,7 @@ public enum JobExecuteType { */ MANUAL, /** - * The job will be executed immediately. + * The job will be executed only once and immediately. */ INSTANT, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java index 2df65e4654..22b799225d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java @@ -36,7 +36,6 @@ public enum JobStatus { * The stop state cannot be resumed */ STOPPED, - /** * When the task is finished, the finished state will be triggered. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java index 1beb4e0a38..084a39ddf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java @@ -19,5 +19,5 @@ package org.apache.doris.job.common; public enum JobType { INSERT, - MV + MV, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 74581b8f1b..9256864efc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -17,43 +17,74 @@ package org.apache.doris.job.extensions.insert; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.Config; -import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; -import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.mysql.privilege.Privilege; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.ErrorTabletInfo; +import org.apache.doris.transaction.TabletCommitInfo; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +@EqualsAndHashCode(callSuper = true) @Data @Slf4j -public class InsertJob extends AbstractJob { +public class InsertJob extends AbstractJob> { public static final ImmutableList SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createStringType()), @@ -66,40 +97,145 @@ public class InsertJob extends AbstractJob { new Column("CreateTime", ScalarType.createStringType()), new Column("Comment", ScalarType.createStringType())); + private static final ShowResultSetMetaData TASK_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("TaskId", ScalarType.createVarchar(80))) + .addColumn(new Column("Label", ScalarType.createVarchar(80))) + .addColumn(new Column("Status", ScalarType.createVarchar(20))) + .addColumn(new Column("EtlInfo", ScalarType.createVarchar(100))) + .addColumn(new Column("TaskInfo", ScalarType.createVarchar(100))) + .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(100))) + + .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200))) + .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200))) + .addColumn(new Column("User", ScalarType.createVarchar(50))) + .build(); + public static final ImmutableMap COLUMN_TO_INDEX; static { - ImmutableMap.Builder builder = new ImmutableMap.Builder(); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (int i = 0; i < SCHEMA.size(); i++) { builder.put(SCHEMA.get(i).getName().toLowerCase(), i); } COLUMN_TO_INDEX = builder.build(); } - @SerializedName(value = "lp") - String labelPrefix; - - InsertIntoTableCommand command; - - StmtExecutor stmtExecutor; - - ConnectContext ctx; - @SerializedName("tis") ConcurrentLinkedQueue historyTaskIdList; + @SerializedName("did") + private final long dbId; + @SerializedName("ln") + private String labelName; + @SerializedName("lt") + private InsertJob.LoadType loadType; + // 0: the job status is pending + // n/100: n is the number of task which has been finished + // 99: all tasks have been finished + // 100: txn status is visible and load has been finished + @SerializedName("pg") + private int progress; + @SerializedName("fm") + private FailMsg failMsg; + @SerializedName("plans") + private List plans = new ArrayList<>(); + private LoadStatistic loadStatistic = new LoadStatistic(); + private Set finishedTaskIds = new HashSet<>(); + private ConcurrentHashMap idToTasks = new ConcurrentHashMap<>(); + private Map properties = new HashMap<>(); + private Set tableNames; + private AuthorizationInfo authorizationInfo; + + private ConnectContext ctx; + private StmtExecutor stmtExecutor; + private List errorTabletInfos = new ArrayList<>(); + private List commitInfos = new ArrayList<>(); + + // max save task num, do we need to config it? + private static final int MAX_SAVE_TASK_NUM = 100; + + /** + * load job type + */ + public enum LoadType { + BULK, + SPARK, + LOCAL_FILE, + UNKNOWN + + } + + public enum Priority { + HIGH(0), + NORMAL(1), + LOW(2); + + Priority(int value) { + this.value = value; + } + + private final int value; + + public int getValue() { + return value; + } + } + + public InsertJob(String jobName, + JobStatus jobStatus, + String dbName, + String comment, + UserIdentity createUser, + JobExecutionConfiguration jobConfig, + Long createTimeMs, + String executeSql) { + super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser, + jobConfig, createTimeMs, executeSql, null); + this.dbId = ConnectContext.get().getCurrentDbId(); + } + + public InsertJob(ConnectContext ctx, + StmtExecutor executor, + String labelName, + List plans, + Set sinkTableNames, + Map properties, + String comment, + JobExecutionConfiguration jobConfig) { + super(getNextJobId(), labelName, JobStatus.RUNNING, null, + comment, ctx.getCurrentUserIdentity(), jobConfig); + this.ctx = ctx; + this.plans = plans; + this.stmtExecutor = executor; + this.dbId = ctx.getCurrentDbId(); + this.labelName = labelName; + this.tableNames = sinkTableNames; + this.properties = properties; + // TODO: not support other type yet + this.loadType = InsertJob.LoadType.BULK; + } @Override - public List createTasks(TaskType taskType, Map taskContext) { - //nothing need to do in insert job - InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser()); - task.setJobId(getJobId()); - task.setTaskType(taskType); - task.setTaskId(Env.getCurrentEnv().getNextId()); - ArrayList tasks = new ArrayList<>(); - tasks.add(task); - super.initTasks(tasks); - recordTask(task.getTaskId()); - return tasks; + public List createTasks(TaskType taskType, Map taskContext) { + if (plans.isEmpty()) { + InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); + idToTasks.put(task.getTaskId(), task); + recordTask(task.getTaskId()); + } else { + // use for load stmt + for (InsertIntoTableCommand logicalPlan : plans) { + if (!logicalPlan.getLabelName().isPresent()) { + throw new IllegalArgumentException("Load plan need label name."); + } + InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic); + idToTasks.put(task.getTaskId(), task); + recordTask(task.getTaskId()); + } + } + initTasks(idToTasks.values(), taskType); + return new ArrayList<>(idToTasks.values()); } public void recordTask(long id) { @@ -116,7 +252,6 @@ public class InsertJob extends AbstractJob { if (historyTaskIdList.size() >= Config.max_persistence_task_count) { historyTaskIdList.poll(); } - Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @Override @@ -125,23 +260,27 @@ public class InsertJob extends AbstractJob { } @Override - public boolean isReadyForScheduling(Map taskContext) { + public void cancelAllTasks() throws JobException { + try { + checkAuth("CANCEL LOAD"); + super.cancelAllTasks(); + this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); + } catch (DdlException e) { + throw new JobException(e); + } + } + + @Override + public boolean isReadyForScheduling(Map taskContext) { return CollectionUtils.isEmpty(getRunningTasks()); } - - @Override - public void cancelAllTasks() throws JobException { - super.cancelAllTasks(); - } - - @Override protected void checkJobParamsInternal() { - if (command == null && StringUtils.isBlank(getExecuteSql())) { + if (plans.isEmpty() && StringUtils.isBlank(getExecuteSql())) { throw new IllegalArgumentException("command or sql is null,must be set"); } - if (null != command && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + if (!plans.isEmpty() && !getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { throw new IllegalArgumentException("command must be null when executeType is not instant"); } } @@ -153,27 +292,22 @@ public class InsertJob extends AbstractJob { } //TODO it's will be refactor, we will storage task info in job inner and query from it List taskIdList = new ArrayList<>(this.historyTaskIdList); + Collections.reverse(taskIdList); - List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); - if (CollectionUtils.isEmpty(loadJobs)) { + return queryLoadTasksByTaskIds(taskIdList); + } + + public List queryLoadTasksByTaskIds(List taskIdList) { + if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List tasks = new ArrayList<>(); - loadJobs.forEach(loadJob -> { - InsertTask task; - try { - task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); - task.setCreateTimeMs(loadJob.getCreateTimestamp()); - } catch (MetaNotFoundException e) { - log.warn("load job not found, job id is {}", loadJob.getId()); - return; + List jobs = new ArrayList<>(); + taskIdList.forEach(id -> { + if (null != idToTasks.get(id)) { + jobs.add(idToTasks.get(id)); } - task.setJobId(getJobId()); - task.setTaskId(loadJob.getId()); - task.setLoadJob(loadJob); - tasks.add(task); }); - return tasks; + return jobs; } @Override @@ -193,6 +327,12 @@ public class InsertJob extends AbstractJob { @Override public void onTaskFail(InsertTask task) { + try { + updateJobStatus(JobStatus.STOPPED); + this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg()); + } catch (JobException e) { + throw new RuntimeException(e); + } getRunningTasks().remove(task); } @@ -203,7 +343,129 @@ public class InsertJob extends AbstractJob { @Override public List getShowInfo() { - return super.getCommonShowInfo(); + try { + // check auth + checkAuth("SHOW LOAD"); + List jobInfo = Lists.newArrayList(); + // jobId + jobInfo.add(getJobId().toString()); + // label + if (StringUtils.isEmpty(getLabelName())) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(getLabelName()); + } + // state + if (getJobStatus() == JobStatus.STOPPED) { + jobInfo.add("CANCELLED"); + } else { + jobInfo.add(getJobStatus().name()); + } + + // progress + String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId())); + switch (getJobStatus()) { + case RUNNING: + if (isPending()) { + jobInfo.add("ETL:0%; LOAD:0%"); + } else { + jobInfo.add("ETL:100%; LOAD:" + progress + "%"); + } + break; + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case STOPPED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + // type + jobInfo.add(loadType.name()); + + // etl info + if (loadStatistic.getCounters().size() == 0) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadStatistic.getCounters())); + } + + // task info + jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout() + + "; max_filter_ratio:" + getMaxFilterRatio() + "; priority:" + getPriority()); + // error msg + if (failMsg == null) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs())); + // tracking urls + List trackingUrl = idToTasks.values().stream() + .map(task -> { + if (StringUtils.isNotEmpty(task.getTrackingUrl())) { + return task.getTrackingUrl(); + } else { + return FeConstants.null_string; + } + }) + .collect(Collectors.toList()); + if (trackingUrl.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(trackingUrl.toString()); + } + // job details + jobInfo.add(loadStatistic.toJson()); + // transaction id + jobInfo.add(String.valueOf(0)); + // error tablets + jobInfo.add(errorTabletsToJson()); + // user, some load job may not have user info + if (getCreateUser() == null || getCreateUser().getQualifiedUser() == null) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(getCreateUser().getQualifiedUser()); + } + // comment + jobInfo.add(getComment()); + return jobInfo; + } catch (DdlException e) { + throw new RuntimeException(e); + } + } + + private String getPriority() { + return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name()); + } + + public double getMaxFilterRatio() { + return Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.0")); + } + + public long getTimeout() { + if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + } + return Config.broker_load_default_timeout_second; + } + + + public static InsertJob readFields(DataInput in) throws IOException { + String jsonJob = Text.readString(in); + InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class); + job.setRunningTasks(new ArrayList<>()); + return job; } @Override @@ -211,19 +473,129 @@ public class InsertJob extends AbstractJob { Text.writeString(out, GsonUtils.GSON.toJson(this)); } - private static final ShowResultSetMetaData TASK_META_DATA = - ShowResultSetMetaData.builder() - .addColumn(new Column("TaskId", ScalarType.createVarchar(20))) - .addColumn(new Column("Label", ScalarType.createVarchar(20))) - .addColumn(new Column("Status", ScalarType.createVarchar(20))) - .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20))) - .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20))) - .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20))) + public String errorTabletsToJson() { + Map map = new HashMap<>(); + errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load) + .forEach(p -> map.put(p.getTabletId(), p.getMsg())); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(map); + } - .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) - .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) - .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20))) - .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20))) - .addColumn(new Column("User", ScalarType.createVarchar(20))) - .build(); + public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + long scannedBytes, boolean isDone) { + loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); + progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100); + if (progress == 100) { + progress = 99; + } + } + + private void checkAuth(String command) throws DdlException { + if (authorizationInfo == null) { + // use the old method to check priv + checkAuthWithoutAuthInfo(command); + return; + } + if (!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(), authorizationInfo, + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, + Privilege.LOAD_PRIV); + } + } + + /** + * This method is compatible with old load job without authorization info + * If db or table name could not be found by id, it will throw the NOT_EXISTS_ERROR + * + * @throws DdlException + */ + private void checkAuthWithoutAuthInfo(String command) throws DdlException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); + // check auth + if (tableNames == null || tableNames.isEmpty()) { + // forward compatibility + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), db.getFullName(), + PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, + Privilege.LOAD_PRIV); + } + } else { + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), db.getFullName(), + tblName, PrivPredicate.LOAD)) { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, + command, + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), db.getFullName() + ": " + tblName); + } + } + } + } + + public void unprotectReadEndOperation(InsertJob replayLog) { + setJobStatus(replayLog.getJobStatus()); + progress = replayLog.getProgress(); + setStartTimeMs(replayLog.getStartTimeMs()); + setFinishTimeMs(replayLog.getFinishTimeMs()); + failMsg = replayLog.getFailMsg(); + } + + public String getResourceName() { + // TODO: get tvf param from tvf relation + return "N/A"; + } + + public boolean isRunning() { + return getJobStatus() != JobStatus.FINISHED; + } + + public boolean isPending() { + return getJobStatus() != JobStatus.FINISHED; + } + + public boolean isCancelled() { + return getJobStatus() == JobStatus.STOPPED; + } + + @Override + public void onRegister() throws JobException { + try { + if (StringUtils.isNotEmpty(labelName)) { + Env.getCurrentEnv().getLabelProcessor().addJob(this); + } + } catch (LabelAlreadyUsedException e) { + throw new JobException(e); + } + } + + @Override + public void onUnRegister() throws JobException { + // TODO: need record cancelled jobs in order to show cancelled job + // Env.getCurrentEnv().getLabelProcessor().removeJob(getDbId(), getLabelName()); + } + + @Override + public void onReplayCreate() throws JobException { + JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); + jobConfig.setExecuteType(JobExecuteType.INSTANT); + setJobConfig(jobConfig); + onRegister(); + checkJobParams(); + log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build()); + } + + @Override + public void onReplayEnd(AbstractJob> replayJob) throws JobException { + if (!(replayJob instanceof InsertJob)) { + return; + } + InsertJob insertJob = (InsertJob) replayJob; + unprotectReadEndOperation(insertJob); + log.info(new LogBuilder(LogKey.LOAD_JOB, + insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build()); + } + + public int getProgress() { + return progress; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 7e98895026..e85e7a1b02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -25,7 +25,8 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.qe.ConnectContext; @@ -34,12 +35,12 @@ import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; import java.util.Optional; import java.util.UUID; @@ -53,8 +54,6 @@ public class InsertTask extends AbstractTask { new Column("JobId", ScalarType.createStringType()), new Column("Label", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), - new Column("EtlInfo", ScalarType.createStringType()), - new Column("TaskInfo", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTimeMs", ScalarType.createStringType()), new Column("FinishTimeMs", ScalarType.createStringType()), @@ -73,30 +72,69 @@ public class InsertTask extends AbstractTask { } private String labelName; - private InsertIntoTableCommand command; - private StmtExecutor stmtExecutor; - private ConnectContext ctx; - private String sql; - private String currentDb; - private UserIdentity userIdentity; - + private LoadStatistic loadStatistic; private AtomicBoolean isCanceled = new AtomicBoolean(false); - private AtomicBoolean isFinished = new AtomicBoolean(false); - private static final String LABEL_SPLITTER = "_"; + private FailMsg failMsg; + @Getter + private String trackingUrl; @Getter @Setter - private LoadJob loadJob; + private InsertJob jobInfo; + private TaskType taskType = TaskType.PENDING; + private MergeType mergeType = MergeType.APPEND; + /** + * task merge type + */ + enum MergeType { + MERGE, + APPEND, + DELETE + } + + /** + * task type + */ + enum TaskType { + UNKNOWN, // this is only for ISSUE #2354 + PENDING, + LOADING, + FINISHED, + FAILED, + CANCELLED + } + + public InsertTask(InsertIntoTableCommand insertInto, + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + this(insertInto.getLabelName().get(), insertInto, ctx, executor, statistic); + } + + public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) { + this.labelName = labelName; + this.sql = sql; + this.currentDb = currentDb; + this.userIdentity = userIdentity; + } + + public InsertTask(String labelName, InsertIntoTableCommand insertInto, + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + this.labelName = labelName; + this.command = insertInto; + this.userIdentity = ctx.getCurrentUserIdentity(); + this.ctx = ctx; + this.stmtExecutor = executor; + this.loadStatistic = statistic; + } @Override public void before() throws JobException { @@ -109,15 +147,19 @@ public class InsertTask extends AbstractTask { ctx.setCurrentUserIdentity(userIdentity); ctx.getState().reset(); ctx.setThreadLocalInfo(); - ctx.setDatabase(currentDb); + if (StringUtils.isNotEmpty(currentDb)) { + ctx.setDatabase(currentDb); + } TUniqueId queryId = generateQueryId(UUID.randomUUID().toString()); ctx.getSessionVariable().enableFallbackToOriginalPlanner = false; stmtExecutor = new StmtExecutor(ctx, (String) null); ctx.setQueryId(queryId); - NereidsParser parser = new NereidsParser(); - this.command = (InsertIntoTableCommand) parser.parseSingle(sql); - this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); - this.command.setJobId(getTaskId()); + if (StringUtils.isNotEmpty(sql)) { + NereidsParser parser = new NereidsParser(); + this.command = (InsertIntoTableCommand) parser.parseSingle(sql); + this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); + this.command.setJobId(getTaskId()); + } super.before(); @@ -128,14 +170,6 @@ public class InsertTask extends AbstractTask { return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); } - public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) { - this.labelName = labelName; - this.sql = sql; - this.currentDb = currentDb; - this.userIdentity = userIdentity; - - } - @Override public void run() throws JobException { try { @@ -143,7 +177,7 @@ public class InsertTask extends AbstractTask { log.info("task has been canceled, task id is {}", getTaskId()); return; } - command.run(ctx, stmtExecutor); + command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic); } catch (Exception e) { log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(), getTaskId(), sql, e); @@ -178,42 +212,28 @@ public class InsertTask extends AbstractTask { @Override public TRow getTvfInfo() { TRow trow = new TRow(); - if (loadJob == null) { + if (jobInfo == null) { // if task not start, load job is null,return pending task show info return getPendingTaskTVFInfo(); } - trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name())); - // etl info - String etlInfo = FeConstants.null_string; - if (!loadJob.getLoadingStatus().getCounters().isEmpty()) { - etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters()); - } - trow.addToColumnValue(new TCell().setStringVal(etlInfo)); - - // task info - String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority(); - trow.addToColumnValue(new TCell().setStringVal(taskInfo)); - + trow.addToColumnValue(new TCell().setStringVal(labelName)); + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name())); // err msg String errMsg = FeConstants.null_string; - if (loadJob.getFailMsg() != null) { - errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg(); + if (failMsg != null) { + errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); } trow.addToColumnValue(new TCell().setStringVal(errMsg)); - // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp()))); - + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs()))); // tracking url - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson())); - trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } @@ -225,8 +245,6 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index c500e69329..669a2806a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -125,7 +125,7 @@ public class MTMVJob extends AbstractJob { task.setTaskType(taskType); ArrayList tasks = new ArrayList<>(); tasks.add(task); - super.initTasks(tasks); + super.initTasks(tasks, taskType); return tasks; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 776af152c6..814d6b773a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -17,7 +17,15 @@ package org.apache.doris.job.manager; +import org.apache.doris.analysis.CancelLoadStmt; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -26,44 +34,85 @@ import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.scheduler.JobScheduler; import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.load.loadv2.JobState; +import com.google.common.collect.Lists; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @Log4j2 public class JobManager, C> implements Writable { - private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(32); - private JobScheduler jobScheduler; + private JobScheduler jobScheduler; + + // lock for job + // lock is private and must use after db lock + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } public void start() { - jobScheduler = new JobScheduler(jobMap); + jobScheduler = new JobScheduler(jobMap); jobScheduler.start(); } - public void registerJob(T job) throws JobException { - job.checkJobParams(); - checkJobNameExist(job.getJobName()); - if (jobMap.get(job.getJobId()) != null) { - throw new JobException("job id exist, jobId:" + job.getJobId()); - } - Env.getCurrentEnv().getEditLog().logCreateJob(job); - jobMap.put(job.getJobId(), job); - //check its need to scheduler - jobScheduler.scheduleOneJob(job); + + /** + * get running job + * + * @param jobId id + * @return running job + */ + public T getJob(long jobId) { + return jobMap.get(jobId); } + public void registerJob(T job) throws JobException { + writeLock(); + try { + job.onRegister(); + job.checkJobParams(); + checkJobNameExist(job.getJobName()); + if (jobMap.get(job.getJobId()) != null) { + throw new JobException("job id exist, jobId:" + job.getJobId()); + } + jobMap.put(job.getJobId(), job); + //check its need to scheduler + jobScheduler.scheduleOneJob(job); + job.logCreateOperation(); + } finally { + writeUnlock(); + } + } private void checkJobNameExist(String jobName) throws JobException { if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) { @@ -72,11 +121,17 @@ public class JobManager, C> implements Writable { } public void unregisterJob(Long jobId) throws JobException { - checkJobExist(jobId); - jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); - jobMap.get(jobId).cancelAllTasks(); - Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId)); - jobMap.remove(jobId); + writeLock(); + try { + checkJobExist(jobId); + jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); + jobMap.get(jobId).cancelAllTasks(); + jobMap.get(jobId).logFinalOperation(); + jobMap.get(jobId).onUnRegister(); + jobMap.remove(jobId); + } finally { + writeUnlock(); + } } public void unregisterJob(String jobName) throws JobException { @@ -95,7 +150,7 @@ public class JobManager, C> implements Writable { public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); - Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId)); + jobMap.get(jobId).logUpdateOperation(); } public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { @@ -169,13 +224,12 @@ public class JobManager, C> implements Writable { jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context); } - public void replayCreateJob(T job) { + public void replayCreateJob(T job) throws JobException { if (jobMap.containsKey(job.getJobId())) { return; } jobMap.putIfAbsent(job.getJobId(), job); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay create scheduler job").build()); + job.onReplayCreate(); } /** @@ -187,13 +241,12 @@ public class JobManager, C> implements Writable { .add("msg", "replay update scheduler job").build()); } - public void replayDeleteJob(T job) { - if (null == jobMap.get(job.getJobId())) { + public void replayEndJob(T replayJob) throws JobException { + T job = jobMap.get(replayJob.getJobId()); + if (null == job) { return; } - jobMap.remove(job.getJobId()); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay delete scheduler job").build()); + job.onReplayEnd(replayJob); } public void cancelTaskById(String jobName, Long taskId) throws JobException { @@ -236,4 +289,135 @@ public class JobManager, C> implements Writable { return jobMap.get(jobId); } + + /** + * get load info by db + * + * @param dbId db id + * @param dbName db name + * @param labelValue label name + * @param accurateMatch accurate match + * @param jobState state + * @return load infos + * @throws AnalysisException ex + */ + public List> getLoadJobInfosByDb(long dbId, String dbName, + String labelValue, + boolean accurateMatch, + JobState jobState) throws AnalysisException { + LinkedList> loadJobInfos = new LinkedList<>(); + if (!Env.getCurrentEnv().getLabelProcessor().existJobs(dbId)) { + return loadJobInfos; + } + readLock(); + try { + List loadJobList = Env.getCurrentEnv().getLabelProcessor() + .filterJobs(dbId, labelValue, accurateMatch); + // check state + for (InsertJob loadJob : loadJobList) { + try { + if (jobState != null && !validState(jobState, loadJob)) { + continue; + } + // add load job info, convert String list to Comparable list + loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo())); + } catch (RuntimeException e) { + // ignore this load job + log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e); + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + + private static boolean validState(JobState jobState, InsertJob loadJob) { + JobStatus status = loadJob.getJobStatus(); + switch (status) { + case RUNNING: + return jobState == JobState.PENDING || jobState == JobState.ETL + || jobState == JobState.LOADING || jobState == JobState.COMMITTED; + case STOPPED: + return jobState == JobState.CANCELLED; + case FINISHED: + return jobState == JobState.FINISHED; + default: + return false; + } + } + + public void cancelLoadJob(CancelLoadStmt cs) + throws JobException, AnalysisException, DdlException { + String dbName = cs.getDbName(); + String label = cs.getLabel(); + String state = cs.getState(); + CompoundPredicate.Operator operator = cs.getOperator(); + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); + // List of load jobs waiting to be cancelled + List unfinishedLoadJob; + readLock(); + try { + List loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db); + List matchLoadJobs = Lists.newArrayList(); + addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs); + if (matchLoadJobs.isEmpty()) { + throw new JobException("Load job does not exist"); + } + // check state here + unfinishedLoadJob = + matchLoadJobs.stream().filter(InsertJob::isRunning) + .collect(Collectors.toList()); + if (unfinishedLoadJob.isEmpty()) { + throw new JobException("There is no uncompleted job"); + } + } finally { + readUnlock(); + } + for (InsertJob loadJob : unfinishedLoadJob) { + try { + unregisterJob(loadJob.getJobId()); + } catch (JobException e) { + log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName()); + } + } + } + + private static void addNeedCancelLoadJob(String label, String state, + CompoundPredicate.Operator operator, List loadJobs, + List matchLoadJobs) + throws AnalysisException { + PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label, + CaseSensibility.LABEL.getCaseSensibility()); + matchLoadJobs.addAll( + loadJobs.stream() + .filter(job -> !job.isCancelled()) + .filter(job -> { + if (operator != null) { + // compound + boolean labelFilter = + label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state); + return CompoundPredicate.Operator.AND.equals(operator) ? labelFilter && stateFilter : + labelFilter || stateFilter; + } + if (StringUtils.isNotEmpty(label)) { + return label.contains("%") ? matcher.match(job.getLabelName()) + : job.getLabelName().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + return job.getJobStatus().name().equalsIgnoreCase(state); + } + return false; + }).collect(Collectors.toList()) + ); + } + // public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + // long scannedBytes, boolean isDone) { + // AbstractJob job = jobMap.get(jobId); + // if (job != null) { + // job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); + // } + // } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 7327183e95..71f6ff1c4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException; import com.google.gson.annotations.SerializedName; import lombok.Data; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.RandomUtils; @Data @Log4j2 @@ -52,6 +53,15 @@ public abstract class AbstractTask implements Task { @SerializedName(value = "emg") private String errMsg; + public AbstractTask() { + taskId = getNextTaskId(); + } + + private static long getNextTaskId() { + // do not use Env.getNextId(), just generate id without logging + return System.nanoTime() + RandomUtils.nextInt(); + } + @Override public void onFail(String msg) throws JobException { status = TaskStatus.FAILED; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 996c3ccbb3..fc2f6fca9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -73,6 +73,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Preconditions; @@ -198,7 +199,7 @@ public class ExportJob implements Writable { private List jobExecutorList; - private ConcurrentHashMap taskIdToExecutor = new ConcurrentHashMap<>(); + private ConcurrentHashMap taskIdToExecutor = new ConcurrentHashMap<>(); private Integer finishedTaskCount = 0; private List> allOutfileInfo = Lists.newArrayList(); @@ -380,6 +381,10 @@ public class ExportJob implements Writable { return statementBase; } + public List getTaskExecutors() { + return jobExecutorList; + } + private void generateExportJobExecutor() { jobExecutorList = Lists.newArrayList(); for (List selectStmts : selectStmtListPerParallel) { @@ -607,7 +612,7 @@ public class ExportJob implements Writable { // we need cancel all task taskIdToExecutor.keySet().forEach(id -> { try { - Env.getCurrentEnv().getExportTaskRegister().cancelTask(id); + Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id); } catch (JobException e) { LOG.warn("cancel export task {} exception: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index ae7a175b89..90be7f9f10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -59,23 +59,26 @@ import java.util.stream.Collectors; public class ExportMgr { private static final Logger LOG = LogManager.getLogger(ExportJob.class); - - // lock for export job - // lock is private and must use after db lock - private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private Map exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob // dbid ->