[test](Job)Delete the JOB show syntax (now we use TVF) and add tvf case (#28058)
This commit is contained in:
@ -689,7 +689,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
|
||||
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
|
||||
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, show_create_reporitory_stmt,
|
||||
describe_stmt, alter_stmt, unset_var_stmt,
|
||||
create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,cancel_job_task_stmt,
|
||||
create_job_stmt, pause_job_stmt, resume_job_stmt,stop_job_stmt, cancel_job_task_stmt,
|
||||
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
|
||||
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
|
||||
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
|
||||
@ -1160,8 +1160,6 @@ stmt ::=
|
||||
{: RESULT = stmt; :}
|
||||
| cancel_job_task_stmt : stmt
|
||||
{: RESULT = stmt; :}
|
||||
| show_job_stmt : stmt
|
||||
{: RESULT = stmt; :}
|
||||
| stop_job_stmt : stmt
|
||||
{: RESULT = stmt; :}
|
||||
| resume_job_stmt : stmt
|
||||
@ -2593,32 +2591,6 @@ create_job_stmt ::=
|
||||
RESULT = endTime;
|
||||
:}
|
||||
;
|
||||
show_job_stmt ::=
|
||||
KW_SHOW KW_JOBS
|
||||
{:
|
||||
RESULT = new ShowJobStmt(null,null);
|
||||
:}
|
||||
| KW_SHOW KW_MTMV KW_JOBS
|
||||
{:
|
||||
RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MV);
|
||||
:}
|
||||
| KW_SHOW KW_MTMV KW_JOB KW_FOR job_label:jobLabel
|
||||
{:
|
||||
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
|
||||
:}
|
||||
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
|
||||
{:
|
||||
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.INSERT);
|
||||
:}
|
||||
| KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel
|
||||
{:
|
||||
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.INSERT);
|
||||
:}
|
||||
| KW_SHOW KW_MTMV KW_JOB KW_TASKS KW_FOR job_label:jobLabel
|
||||
{:
|
||||
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
|
||||
:}
|
||||
;
|
||||
pause_job_stmt ::=
|
||||
KW_PAUSE KW_JOB opt_wild_where
|
||||
{:
|
||||
|
||||
@ -1,115 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.job.common.JobType;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SHOW JOB [FOR JobName]
|
||||
* eg: show event
|
||||
* return all job in connection db
|
||||
* eg: show event for test
|
||||
* return job named test in connection db
|
||||
*/
|
||||
public class ShowJobStmt extends ShowStmt {
|
||||
|
||||
private static final ImmutableList<String> TITLE_NAMES =
|
||||
new ImmutableList.Builder<String>()
|
||||
.add("Id")
|
||||
.add("Name")
|
||||
.add("Definer")
|
||||
.add("ExecuteType")
|
||||
.add("RecurringStrategy")
|
||||
.add("Status")
|
||||
.add("ExecuteSql")
|
||||
.add("CreateTime")
|
||||
.add("Comment")
|
||||
.build();
|
||||
|
||||
private static final String MTMV_NAME_TITLE = "mtmv_name";
|
||||
|
||||
private static final String NAME_TITLE = "name";
|
||||
private final LabelName labelName;
|
||||
|
||||
@Getter
|
||||
private String dbFullName; // optional
|
||||
|
||||
@Getter
|
||||
private JobType jobType; // optional
|
||||
|
||||
/**
|
||||
* Supported job types, if we want to support more job types, we need to add them here.
|
||||
*/
|
||||
@Getter
|
||||
private List<JobType> jobTypes = Arrays.asList(JobType.INSERT); // optional
|
||||
|
||||
@Getter
|
||||
private String name; // optional
|
||||
@Getter
|
||||
private String pattern; // optional
|
||||
|
||||
public ShowJobStmt(LabelName labelName, JobType jobType) {
|
||||
this.labelName = labelName;
|
||||
this.jobType = jobType;
|
||||
this.name = labelName == null ? null : labelName.getLabelName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
checkAuth();
|
||||
}
|
||||
|
||||
private void checkAuth() throws AnalysisException {
|
||||
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
|
||||
if (!userIdentity.isRootUser()) {
|
||||
throw new AnalysisException("only root user can operate");
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getTitleNames() {
|
||||
return TITLE_NAMES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShowResultSetMetaData getMetaData() {
|
||||
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
|
||||
|
||||
for (String title : TITLE_NAMES) {
|
||||
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.FORWARD_NO_SYNC;
|
||||
}
|
||||
}
|
||||
@ -1,89 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.job.common.JobType;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SHOW JOB TASKS [FOR JobName]
|
||||
*/
|
||||
public class ShowJobTaskStmt extends ShowStmt {
|
||||
|
||||
private static final ImmutableList<String> TITLE_NAMES =
|
||||
new ImmutableList.Builder<String>()
|
||||
.add("TaskId")
|
||||
.add("JobId")
|
||||
.add("JobName")
|
||||
.add("CreateTime")
|
||||
.add("StartTime")
|
||||
.add("EndTime")
|
||||
.add("Status")
|
||||
.add("ExecuteSql")
|
||||
.add("Result")
|
||||
.add("ErrorMsg")
|
||||
.build();
|
||||
|
||||
@Getter
|
||||
private final LabelName labelName;
|
||||
|
||||
@Getter
|
||||
private String name; // optional
|
||||
|
||||
@Getter
|
||||
JobType jobType;
|
||||
|
||||
public ShowJobTaskStmt(LabelName labelName, JobType jobType) {
|
||||
this.labelName = labelName;
|
||||
this.jobType = jobType;
|
||||
this.name = labelName == null ? null : labelName.getLabelName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
CreateJobStmt.checkAuth();
|
||||
}
|
||||
|
||||
public static List<String> getTitleNames() {
|
||||
return TITLE_NAMES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShowResultSetMetaData getMetaData() {
|
||||
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
|
||||
|
||||
for (String title : TITLE_NAMES) {
|
||||
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
}
|
||||
@ -38,25 +38,20 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* todo implement this later
|
||||
*/
|
||||
@Log4j2
|
||||
public class InsertTask extends AbstractTask {
|
||||
|
||||
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("TaskId", ScalarType.createStringType()),
|
||||
new Column("JobId", ScalarType.createStringType()),
|
||||
new Column("Label", ScalarType.createStringType()),
|
||||
new Column("Status", ScalarType.createStringType()),
|
||||
new Column("EtlInfo", ScalarType.createStringType()),
|
||||
@ -96,6 +91,8 @@ public class InsertTask extends AbstractTask {
|
||||
|
||||
private AtomicBoolean isFinished = new AtomicBoolean(false);
|
||||
|
||||
private static final String LABEL_SPLITTER = "_";
|
||||
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ -121,7 +118,7 @@ public class InsertTask extends AbstractTask {
|
||||
ctx.setQueryId(queryId);
|
||||
NereidsParser parser = new NereidsParser();
|
||||
this.command = (InsertIntoTableCommand) parser.parseSingle(sql);
|
||||
this.command.setLabelName(Optional.of(getJobId() + "_" + getTaskId()));
|
||||
this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
|
||||
this.command.setJobId(getTaskId());
|
||||
|
||||
super.before();
|
||||
@ -180,57 +177,15 @@ public class InsertTask extends AbstractTask {
|
||||
super.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getShowInfo() {
|
||||
if (null == loadJob) {
|
||||
return getPendingTaskShowInfo();
|
||||
}
|
||||
List<String> jobInfo = Lists.newArrayList();
|
||||
// jobId
|
||||
jobInfo.add(String.valueOf(loadJob.getId()));
|
||||
// label
|
||||
jobInfo.add(loadJob.getLabel());
|
||||
// state
|
||||
jobInfo.add(loadJob.getState().name());
|
||||
|
||||
// etl info
|
||||
if (loadJob.getLoadingStatus().getCounters().isEmpty()) {
|
||||
jobInfo.add(FeConstants.null_string);
|
||||
} else {
|
||||
jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters()));
|
||||
}
|
||||
|
||||
// task info
|
||||
jobInfo.add("cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout()
|
||||
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority());
|
||||
// error msg
|
||||
if (loadJob.getFailMsg() == null) {
|
||||
jobInfo.add(FeConstants.null_string);
|
||||
} else {
|
||||
jobInfo.add("type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg());
|
||||
}
|
||||
|
||||
// create time
|
||||
jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimestamp()));
|
||||
|
||||
// load end time
|
||||
jobInfo.add(TimeUtils.longToTimeString(loadJob.getFinishTimestamp()));
|
||||
// tracking url
|
||||
jobInfo.add(loadJob.getLoadingStatus().getTrackingUrl());
|
||||
jobInfo.add(loadJob.getLoadStatistic().toJson());
|
||||
// user
|
||||
jobInfo.add(loadJob.getUserInfo().getQualifiedUser());
|
||||
return jobInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TRow getTvfInfo() {
|
||||
TRow trow = new TRow();
|
||||
if (loadJob == null) {
|
||||
return trow;
|
||||
// if task not start, load job is null,return pending task show info
|
||||
return getPendingTaskTVFInfo();
|
||||
}
|
||||
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name()));
|
||||
// etl info
|
||||
@ -265,21 +220,21 @@ public class InsertTask extends AbstractTask {
|
||||
}
|
||||
|
||||
// if task not start, load job is null,return pending task show info
|
||||
private List<String> getPendingTaskShowInfo() {
|
||||
List<String> datas = new ArrayList<>();
|
||||
|
||||
datas.add(String.valueOf(getTaskId()));
|
||||
datas.add(getJobId() + "_" + getTaskId());
|
||||
datas.add(getStatus().name());
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(TimeUtils.longToTimeString(getCreateTimeMs()));
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(FeConstants.null_string);
|
||||
datas.add(userIdentity.getQualifiedUser());
|
||||
return datas;
|
||||
private TRow getPendingTaskTVFInfo() {
|
||||
TRow trow = new TRow();
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getTaskId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
|
||||
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
|
||||
return trow;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -44,12 +44,10 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class MTMVTask extends AbstractTask {
|
||||
@ -143,20 +141,6 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getShowInfo() {
|
||||
List<String> data = Lists.newArrayList();
|
||||
data.add(super.getJobId() + "");
|
||||
data.add(super.getTaskId() + "");
|
||||
data.add(super.getStatus() + "");
|
||||
data.add(TimeUtils.longToTimeString(super.getCreateTimeMs()));
|
||||
data.add(TimeUtils.longToTimeString(super.getStartTimeMs()));
|
||||
data.add(TimeUtils.longToTimeString(super.getFinishTimeMs()));
|
||||
data.add(String.valueOf(super.getFinishTimeMs() - super.getStartTimeMs()));
|
||||
data.add(sql);
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TRow getTvfInfo() {
|
||||
TRow trow = new TRow();
|
||||
|
||||
@ -20,8 +20,6 @@ package org.apache.doris.job.task;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.thrift.TRow;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The Task interface represents a task that can be executed and managed by a scheduler.
|
||||
* All extension tasks must implement this interface.
|
||||
@ -68,12 +66,6 @@ public interface Task {
|
||||
*/
|
||||
void cancel() throws JobException;
|
||||
|
||||
/**
|
||||
* get the job's show info, which is used to sql show the task information
|
||||
* @return List<String> task common show info
|
||||
*/
|
||||
List<String> getShowInfo();
|
||||
|
||||
/**
|
||||
* get info for tvf `tasks`
|
||||
* @return TRow
|
||||
|
||||
@ -64,8 +64,6 @@ import org.apache.doris.analysis.ShowFrontendsStmt;
|
||||
import org.apache.doris.analysis.ShowFunctionsStmt;
|
||||
import org.apache.doris.analysis.ShowGrantsStmt;
|
||||
import org.apache.doris.analysis.ShowIndexStmt;
|
||||
import org.apache.doris.analysis.ShowJobStmt;
|
||||
import org.apache.doris.analysis.ShowJobTaskStmt;
|
||||
import org.apache.doris.analysis.ShowLastInsertStmt;
|
||||
import org.apache.doris.analysis.ShowLoadProfileStmt;
|
||||
import org.apache.doris.analysis.ShowLoadStmt;
|
||||
@ -183,7 +181,6 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.MaxComputeExternalCatalog;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.load.DeleteHandler;
|
||||
import org.apache.doris.load.ExportJobState;
|
||||
import org.apache.doris.load.ExportMgr;
|
||||
@ -432,10 +429,6 @@ public class ShowExecutor {
|
||||
handleShowBuildIndexStmt();
|
||||
} else if (stmt instanceof ShowAnalyzeTaskStatus) {
|
||||
handleShowAnalyzeTaskStatus();
|
||||
} else if (stmt instanceof ShowJobStmt) {
|
||||
handleShowJob();
|
||||
} else if (stmt instanceof ShowJobTaskStmt) {
|
||||
handleShowJobTask();
|
||||
} else if (stmt instanceof ShowConvertLSCStmt) {
|
||||
handleShowConvertLSC();
|
||||
} else {
|
||||
@ -1424,53 +1417,6 @@ public class ShowExecutor {
|
||||
resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows);
|
||||
}
|
||||
|
||||
private void handleShowJobTask() {
|
||||
ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt;
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
List<org.apache.doris.job.base.AbstractJob> jobs = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(showJobTaskStmt.getJobType(), showJobTaskStmt.getName());
|
||||
if (CollectionUtils.isEmpty(jobs)) {
|
||||
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
|
||||
return;
|
||||
}
|
||||
org.apache.doris.job.base.AbstractJob job = jobs.get(0);
|
||||
List<AbstractTask> jobTasks = job.queryAllTasks();
|
||||
if (CollectionUtils.isEmpty(jobTasks)) {
|
||||
resultSet = new ShowResultSet(job.getTaskMetaData(), rows);
|
||||
return;
|
||||
}
|
||||
for (AbstractTask jobTask : jobTasks) {
|
||||
rows.add(jobTask.getShowInfo());
|
||||
}
|
||||
resultSet = new ShowResultSet(job.getTaskMetaData(), rows);
|
||||
}
|
||||
|
||||
private void handleShowJob() throws AnalysisException {
|
||||
ShowJobStmt showJobStmt = (ShowJobStmt) stmt;
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
// if job exists
|
||||
List<org.apache.doris.job.base.AbstractJob> jobList;
|
||||
if (null == showJobStmt.getJobType()) {
|
||||
jobList = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(showJobStmt.getJobTypes());
|
||||
} else {
|
||||
jobList = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(showJobStmt.getJobType(), showJobStmt.getName());
|
||||
}
|
||||
|
||||
if (jobList.isEmpty()) {
|
||||
resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows);
|
||||
return;
|
||||
}
|
||||
|
||||
// check auth
|
||||
|
||||
for (org.apache.doris.job.base.AbstractJob job : jobList) {
|
||||
rows.add(job.getShowInfo());
|
||||
}
|
||||
resultSet = new ShowResultSet(jobList.get(0).getJobMetaData(), rows);
|
||||
}
|
||||
|
||||
private void handleShowRoutineLoad() throws AnalysisException {
|
||||
ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt;
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
|
||||
@ -580,7 +580,7 @@ public class MetadataGenerator {
|
||||
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
|
||||
|
||||
for (org.apache.doris.job.base.AbstractJob job : jobList) {
|
||||
List<AbstractTask> tasks = job.queryTasks();
|
||||
List<AbstractTask> tasks = job.queryAllTasks();
|
||||
for (AbstractTask task : tasks) {
|
||||
TRow tvfInfo = task.getTvfInfo();
|
||||
if (tvfInfo != null) {
|
||||
|
||||
@ -65,7 +65,8 @@ suite("test_base_insert_job") {
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);
|
||||
"""
|
||||
def currentMs=System.currentTimeMillis()+1000;
|
||||
// Enlarge this parameter to avoid other factors that cause time verification to fail when submitting.
|
||||
def currentMs=System.currentTimeMillis()+20000;
|
||||
def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault());
|
||||
|
||||
def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
@ -73,35 +74,34 @@ suite("test_base_insert_job") {
|
||||
def dataCount = sql """select count(*) from ${tableName}"""
|
||||
assert dataCount.get(0).get(0) == 0
|
||||
sql """
|
||||
CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(1000), 1001);
|
||||
CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001);
|
||||
"""
|
||||
|
||||
Thread.sleep(3000)
|
||||
|
||||
Thread.sleep(25000)
|
||||
def onceJob = sql """select id from jobs("type"="insert") where Name='${jobName}'"""
|
||||
assert onceJob.size() == 1
|
||||
def onceJobId= onceJob.get(0).get(0);
|
||||
// test cancel task
|
||||
def datas = sql """show job tasks for ${jobName}"""
|
||||
def datas = sql """select status,taskid from tasks("type"="insert") where jobid= ${onceJobId}"""
|
||||
println datas
|
||||
assert datas.size() == 1
|
||||
println datas.get(0).get(2)
|
||||
assert datas.get(0).get(2) == "RUNNING"
|
||||
def taskId = datas.get(0).get(0)
|
||||
assert datas.get(0).get(0) == "RUNNING"
|
||||
def taskId = datas.get(0).get(1)
|
||||
sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
|
||||
def cancelTask = sql """ show job tasks for ${jobName}"""
|
||||
def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}"""
|
||||
println cancelTask
|
||||
//check task status
|
||||
assert cancelTask.size() == 1
|
||||
assert cancelTask.get(0).get(2) == "CANCELED"
|
||||
assert cancelTask.get(0).get(0) == "CANCELED"
|
||||
// check table data
|
||||
def dataCount1 = sql """select count(*) from ${tableName}"""
|
||||
def dataCount1 = sql """select count(1) from ${tableName}"""
|
||||
assert dataCount1.get(0).get(0) == 0
|
||||
// check job status
|
||||
def oncejob=sql """show job for ${jobName} """
|
||||
def oncejob=sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """
|
||||
println oncejob
|
||||
assert oncejob.get(0).get(5) == "FINISHED"
|
||||
assert oncejob.get(0).get(0) == "FINISHED"
|
||||
//assert comment
|
||||
println oncejob.get(0).get(8)
|
||||
//check comment
|
||||
assert oncejob.get(0).get(8) == "test for test&68686781jbjbhj//ncsa"
|
||||
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
|
||||
|
||||
try{
|
||||
sql """
|
||||
|
||||
@ -288,11 +288,11 @@ suite("test_build_mtmv") {
|
||||
sql """
|
||||
DROP MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
def jobs = sql """show mtmv job for ${jobName}"""
|
||||
def jobs = sql """select count(1) from jobs("type"="mv") where name= '${jobName}'"""
|
||||
println jobs
|
||||
assertEquals(jobs.size(), 0);
|
||||
def tasks = sql """show mtmv job tasks for ${jobName}"""
|
||||
assertEquals(jobs.get(0).get(0), 0);
|
||||
def tasks = sql """select count(1) from tasks("type"="mv") where jobname = '${jobName}'"""
|
||||
println tasks
|
||||
assertEquals(tasks.size(), 0);
|
||||
assertEquals(tasks.get(0).get(0), 0);
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user