[feature](mtmv)add Job and task tvf (#27967)

add:
select * from jobs("type"="mv");
select * from tasks("type"="mv");
select * from jobs("type"="insert");
select * from tasks("type"="insert");

add check priv for mv_infos("database"="xxx");

change JobType MTMV==>MV
This commit is contained in:
zhangdong
2023-12-05 15:12:36 +08:00
committed by GitHub
parent 02512cd0e2
commit 6074cddcf8
30 changed files with 838 additions and 40 deletions

View File

@ -234,6 +234,12 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::MATERIALIZED_VIEWS:
RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::JOBS:
RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::TASKS:
RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::QUERIES:
RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request));
break;
@ -399,6 +405,44 @@ Status VMetaScanner::_build_materialized_views_metadata_request(
return Status::OK();
}
Status VMetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_jobs_metadata_request";
if (!meta_scan_range.__isset.jobs_params) {
return Status::InternalError("Can not find TJobsMetadataParams from meta_scan_range.");
}
// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::JOBS);
metadata_table_params.__set_jobs_metadata_params(meta_scan_range.jobs_params);
request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}
Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_tasks_metadata_request";
if (!meta_scan_range.__isset.tasks_params) {
return Status::InternalError("Can not find TTasksMetadataParams from meta_scan_range.");
}
// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::TASKS);
metadata_table_params.__set_tasks_metadata_params(meta_scan_range.tasks_params);
request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}
Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request";

View File

@ -83,6 +83,10 @@ private:
TFetchSchemaTableDataRequest* request);
Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_jobs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_tasks_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;

View File

@ -2602,11 +2602,11 @@ show_job_stmt ::=
:}
| KW_SHOW KW_MTMV KW_JOBS
{:
RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MV);
:}
| KW_SHOW KW_MTMV KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
:}
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
{:
@ -2618,7 +2618,7 @@ show_job_stmt ::=
:}
| KW_SHOW KW_MTMV KW_JOB KW_TASKS KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
:}
;
pause_job_stmt ::=

View File

@ -24,10 +24,13 @@ import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.Queries;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;
import com.google.common.collect.ImmutableList;
@ -51,6 +54,9 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(Numbers.class, "numbers"),
tableValued(Queries.class, "queries"),
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks"),
tableValued(WorkloadGroups.class, "workload_groups")
);

View File

@ -30,6 +30,8 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
@ -216,11 +218,30 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
return commonShowInfo;
}
public TRow getCommonTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobId)));
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(createUser.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(jobStatus.name()));
trow.addToColumnValue(new TCell().setStringVal(executeSql));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs)));
trow.addToColumnValue(new TCell().setStringVal(comment));
return trow;
}
@Override
public List<String> getShowInfo() {
return getCommonShowInfo();
}
@Override
public TRow getTvfInfo() {
return getCommonTvfInfo();
}
@Override
public ShowResultSetMetaData getJobMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();

View File

@ -22,6 +22,7 @@ import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TRow;
import java.util.List;
@ -116,4 +117,10 @@ public interface Job<T extends AbstractTask> {
* @return List<String> job common show info
*/
List<String> getShowInfo();
/**
* get info for tvf `jobs`
* @return TRow
*/
TRow getTvfInfo();
}

View File

@ -19,5 +19,5 @@ package org.apache.doris.job.common;
public enum JobType {
INSERT,
MTMV
MV
}

View File

@ -34,6 +34,8 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -51,6 +53,27 @@ import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class InsertJob extends AbstractJob<InsertTask> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("Definer", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("ExecuteSql", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
@SerializedName(value = "lp")
String labelPrefix;

View File

@ -18,7 +18,9 @@
package org.apache.doris.job.extensions.insert;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
@ -29,9 +31,13 @@ import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
@ -49,6 +55,29 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class InsertTask extends AbstractTask {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("TaskId", ScalarType.createStringType()),
new Column("Label", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("EtlInfo", ScalarType.createStringType()),
new Column("TaskInfo", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTimeMs", ScalarType.createStringType()),
new Column("FinishTimeMs", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
private String labelName;
private InsertIntoTableCommand command;
@ -188,4 +217,45 @@ public class InsertTask extends AbstractTask {
return jobInfo;
}
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
if (loadJob == null) {
return trow;
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId())));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name()));
// etl info
String etlInfo = FeConstants.null_string;
if (!loadJob.getLoadingStatus().getCounters().isEmpty()) {
etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters());
}
trow.addToColumnValue(new TCell().setStringVal(etlInfo));
// task info
String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority();
trow.addToColumnValue(new TCell().setStringVal(taskInfo));
// err msg
String errMsg = FeConstants.null_string;
if (loadJob.getFailMsg() != null) {
errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp())));
// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser()));
return trow;
}
}

View File

@ -32,7 +32,11 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
@ -56,6 +60,26 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
.addColumn(new Column("CreateTime", ScalarType.createVarchar(20)))
.addColumn(new Column("Comment", ScalarType.createVarchar(20)))
.build();
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
private static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
@ -111,7 +135,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
@Override
public JobType getJobType() {
return JobType.MTMV;
return JobType.MV;
}
@Override
@ -139,6 +163,19 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
return data;
}
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(super.getComment()));
return trow;
}
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);

View File

@ -18,9 +18,11 @@
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@ -36,8 +38,12 @@ import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
@ -50,6 +56,27 @@ public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
public static final Long MAX_HISTORY_TASKS_NUM = 100L;
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("TaskId", ScalarType.createStringType()),
new Column("JobId", ScalarType.createStringType()),
new Column("JobName", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("DurationMs", ScalarType.createStringType()),
new Column("ExecuteSql", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
@SerializedName(value = "di")
private long dbId;
@SerializedName(value = "mi")
@ -130,6 +157,22 @@ public class MTMVTask extends AbstractTask {
return data;
}
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
trow.addToColumnValue(new TCell().setStringVal(super.getStatus().toString()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs())));
trow.addToColumnValue(
new TCell().setStringVal(String.valueOf(super.getFinishTimeMs() - super.getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(sql));
return trow;
}
private static String generateSql(MTMV mtmv) {
StringBuilder builder = new StringBuilder();
builder.append("INSERT OVERWRITE TABLE ");

View File

@ -120,7 +120,7 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
};
TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE,
mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator);
disruptorMap.put(JobType.MTMV, mtmvDisruptor);
disruptorMap.put(JobType.MV, mtmvDisruptor);
}
public void dispatchTimerJob(AbstractJob<T> job) {

View File

@ -18,6 +18,7 @@
package org.apache.doris.job.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
@ -122,4 +123,9 @@ public abstract class AbstractTask implements Task {
return status.equals(TaskStatus.CANCEL);
}
public String getJobName() {
AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
return job == null ? "" : job.getJobName();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.job.task;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.thrift.TRow;
import java.util.List;
@ -72,4 +73,10 @@ public interface Task {
* @return List<String> task common show info
*/
List<String> getShowInfo();
/**
* get info for tvf `tasks`
* @return TRow
*/
TRow getTvfInfo();
}

View File

@ -118,7 +118,7 @@ public class MTMVJobManager implements MTMVHookService {
@Override
public void dropMTMV(MTMV mtmv) throws DdlException {
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
.queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName());
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
if (!CollectionUtils.isEmpty(jobs)) {
try {
Env.getCurrentEnv().getJobManager()
@ -165,7 +165,7 @@ public class MTMVJobManager implements MTMVHookService {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getMvName().getTbl(), TableType.MATERIALIZED_VIEW);
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
.queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName());
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.JobsTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/**
* jobs
*/
public class Jobs extends TableValuedFunction {
public Jobs(Properties properties) {
super("jobs", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new JobsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build JobsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitJobs(this, context);
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.MvInfosTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/**
* mv_infos
*/
public class MvInfos extends TableValuedFunction {
public MvInfos(Properties properties) {
super("mv_infos", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new MvInfosTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build MvInfosTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitMvInfos(this, context);
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.tablefunction.TasksTableValuedFunction;
import java.util.Map;
/**
* tasks
*/
public class Tasks extends TableValuedFunction {
public Tasks(Properties properties) {
super("tasks", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new TasksTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build TasksTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitTasks(this, context);
}
}

View File

@ -24,11 +24,14 @@ import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.Queries;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;
/** TableValuedFunctionVisitor */
@ -47,6 +50,18 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(frontends, context);
}
default R visitMvInfos(MvInfos mvInfos, C context) {
return visitTableValuedFunction(mvInfos, context);
}
default R visitJobs(Jobs jobs, C context) {
return visitTableValuedFunction(jobs, context);
}
default R visitTasks(Tasks tasks, C context) {
return visitTableValuedFunction(tasks, context);
}
default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) {
return visitTableValuedFunction(frontendsDisks, context);
}

View File

@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TJobsMetadataParams;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* The Implement of table valued function
* jobs("type" = "mv").
*/
public class JobsTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "jobs";
private static final String TYPE = "type";
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TYPE);
private final JobType jobType;
public JobsTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new AnalysisException("'" + key + "' is invalid property");
}
validParams.put(key.toLowerCase(), params.get(key));
}
String type = validParams.get(TYPE);
if (type == null) {
throw new AnalysisException("Invalid job metadata query");
}
JobType jobType = JobType.valueOf(type.toUpperCase());
if (jobType == null) {
throw new AnalysisException("Invalid job metadata query");
}
this.jobType = jobType;
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params)
throws org.apache.doris.common.AnalysisException {
if (!params.isSetJobsMetadataParams()) {
throw new org.apache.doris.common.AnalysisException("Jobs metadata params is not set.");
}
TJobsMetadataParams jobMetadataParams = params.getJobsMetadataParams();
String type = jobMetadataParams.getType();
JobType jobType = JobType.valueOf(type.toUpperCase());
if (jobType == null) {
throw new AnalysisException("Invalid job metadata query");
}
if (JobType.MV == jobType) {
return MTMVJob.COLUMN_TO_INDEX.get(columnName.toLowerCase());
} else if (JobType.INSERT == jobType) {
return InsertJob.COLUMN_TO_INDEX.get(columnName.toLowerCase());
} else {
throw new AnalysisException("Invalid job type: " + jobType.toString());
}
}
@Override
public TMetadataType getMetadataType() {
return TMetadataType.JOBS;
}
@Override
public TMetaScanRange getMetaScanRange() {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.JOBS);
TJobsMetadataParams jobParam = new TJobsMetadataParams();
jobParam.setType(jobType.name());
jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setJobsParams(jobParam);
return metaScanRange;
}
@Override
public String getTableName() {
return "JobsTableValuedFunction";
}
@Override
public List<Column> getTableColumns() throws AnalysisException {
if (JobType.MV == jobType) {
return MTMVJob.SCHEMA;
} else if (JobType.INSERT == jobType) {
return InsertJob.SCHEMA;
} else {
throw new AnalysisException("Invalid job type: " + jobType.toString());
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
@ -29,6 +30,9 @@ import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryDetail;
@ -42,6 +46,7 @@ import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TJobsMetadataParams;
import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
@ -50,6 +55,7 @@ import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTasksMetadataParams;
import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Stopwatch;
@ -100,6 +106,12 @@ public class MetadataGenerator {
case MATERIALIZED_VIEWS:
result = mtmvMetadataResult(params);
break;
case JOBS:
result = jobMetadataResult(params);
break;
case TASKS:
result = taskMetadataResult(params);
break;
case QUERIES:
result = queriesMetadataResult(params, request);
break;
@ -107,7 +119,7 @@ public class MetadataGenerator {
return errorResult("Metadata table params is not set.");
}
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, params.getColumnsName(), params.getMetadataType());
filterColumns(result, params.getColumnsName(), params.getMetadataType(), params);
}
return result;
}
@ -461,14 +473,14 @@ public class MetadataGenerator {
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames, TMetadataType type) throws TException {
List<String> columnNames, TMetadataType type, TMetadataTableRequestParams params) throws TException {
List<TRow> fullColumnsRow = result.getDataBatch();
List<TRow> filterColumnsRows = Lists.newArrayList();
for (TRow row : fullColumnsRow) {
TRow filterRow = new TRow();
try {
for (String columnName : columnNames) {
Integer index = MetadataTableValuedFunction.getColumnIndexFromColumnName(type, columnName);
Integer index = MetadataTableValuedFunction.getColumnIndexFromColumnName(type, columnName, params);
filterRow.addToColumnValue(row.getColumnValue().get(index));
}
} catch (AnalysisException e) {
@ -492,6 +504,8 @@ public class MetadataGenerator {
TMaterializedViewsMetadataParams mtmvMetadataParams = params.getMaterializedViewsMetadataParams();
String dbName = mtmvMetadataParams.getDatabase();
TUserIdentity currentUserIdent = mtmvMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<Table> tables;
@ -505,6 +519,12 @@ public class MetadataGenerator {
for (Table table : tables) {
if (table instanceof MTMV) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME,
table.getQualifiedDbName(), table.getName(),
PrivPredicate.SHOW)) {
continue;
}
MTMV mv = (MTMV) table;
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(mv.getId()));
@ -524,5 +544,53 @@ public class MetadataGenerator {
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult jobMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetJobsMetadataParams()) {
return errorResult("Jobs metadata params is not set.");
}
TJobsMetadataParams jobsMetadataParams = params.getJobsMetadataParams();
String type = jobsMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
dataBatch.add(job.getTvfInfo());
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static TFetchSchemaTableDataResult taskMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetTasksMetadataParams()) {
return errorResult("Tasks metadata params is not set.");
}
TTasksMetadataParams tasksMetadataParams = params.getTasksMetadataParams();
String type = tasksMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
List<AbstractTask> tasks = job.queryTasks();
for (AbstractTask task : tasks) {
TRow tvfInfo = task.getTvfInfo();
if (tvfInfo != null) {
dataBatch.add(tvfInfo);
}
}
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
}

View File

@ -23,11 +23,13 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.MetadataScanNode;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf {
public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName)
throws AnalysisException {
public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName,
TMetadataTableRequestParams params)
throws AnalysisException {
switch (type) {
case BACKENDS:
return BackendsTableValuedFunction.getColumnIndexFromColumnName(columnName);
@ -42,7 +44,11 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf
case CATALOGS:
return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case MATERIALIZED_VIEWS:
return MaterializedViewsTableValuedFunction.getColumnIndexFromColumnName(columnName);
return MvInfosTableValuedFunction.getColumnIndexFromColumnName(columnName);
case JOBS:
return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case TASKS:
return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetaScanRange;
@ -37,10 +38,10 @@ import java.util.Map;
/**
* The Implement of table valued function
* mtmvs("database" = "db1").
* mv_infos("database" = "db1").
*/
public class MaterializedViewsTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "mtmvs";
public class MvInfosTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "mv_infos";
private static final String DB = "database";
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(DB);
@ -73,7 +74,7 @@ public class MaterializedViewsTableValuedFunction extends MetadataTableValuedFun
private final String databaseName;
public MaterializedViewsTableValuedFunction(Map<String, String> params) throws AnalysisException {
public MvInfosTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
@ -100,13 +101,14 @@ public class MaterializedViewsTableValuedFunction extends MetadataTableValuedFun
metaScanRange.setMetadataType(TMetadataType.MATERIALIZED_VIEWS);
TMaterializedViewsMetadataParams mtmvParam = new TMaterializedViewsMetadataParams();
mtmvParam.setDatabase(databaseName);
mtmvParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setMaterializedViewsParams(mtmvParam);
return metaScanRange;
}
@Override
public String getTableName() {
return "MaterializedViewsTableValuedFunction";
return "MvInfosTableValuedFunction";
}
@Override

View File

@ -66,8 +66,12 @@ public abstract class TableValuedFunctionIf {
return new WorkloadGroupsTableValuedFunction(params);
case CatalogsTableValuedFunction.NAME:
return new CatalogsTableValuedFunction(params);
case MaterializedViewsTableValuedFunction.NAME:
return new MaterializedViewsTableValuedFunction(params);
case MvInfosTableValuedFunction.NAME:
return new MvInfosTableValuedFunction(params);
case JobsTableValuedFunction.NAME:
return new JobsTableValuedFunction(params);
case TasksTableValuedFunction.NAME:
return new TasksTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:

View File

@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TTasksMetadataParams;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* The Implement of table valued function
* tasks("type" = "mv").
*/
public class TasksTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "tasks";
private static final String TYPE = "type";
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TYPE);
private final JobType jobType;
public TasksTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new AnalysisException("'" + key + "' is invalid property");
}
validParams.put(key.toLowerCase(), params.get(key));
}
String type = validParams.get(TYPE);
if (type == null) {
throw new AnalysisException("Invalid task metadata query");
}
JobType jobType = JobType.valueOf(type.toUpperCase());
if (jobType == null) {
throw new AnalysisException("Invalid task metadata query");
}
this.jobType = jobType;
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params)
throws org.apache.doris.common.AnalysisException {
if (!params.isSetTasksMetadataParams()) {
throw new org.apache.doris.common.AnalysisException("Tasks metadata params is not set.");
}
TTasksMetadataParams taskMetadataParams = params.getTasksMetadataParams();
String type = taskMetadataParams.getType();
JobType jobType = JobType.valueOf(type.toUpperCase());
if (jobType == null) {
throw new AnalysisException("Invalid task metadata query");
}
if (JobType.MV == jobType) {
return MTMVTask.COLUMN_TO_INDEX.get(columnName.toLowerCase());
} else if (JobType.INSERT == jobType) {
return InsertTask.COLUMN_TO_INDEX.get(columnName.toLowerCase());
} else {
throw new AnalysisException("Invalid job type: " + jobType.toString());
}
}
@Override
public TMetadataType getMetadataType() {
return TMetadataType.TASKS;
}
@Override
public TMetaScanRange getMetaScanRange() {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.TASKS);
TTasksMetadataParams taskParam = new TTasksMetadataParams();
taskParam.setType(jobType.name());
taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setTasksParams(taskParam);
return metaScanRange;
}
@Override
public String getTableName() {
return "TasksTableValuedFunction";
}
@Override
public List<Column> getTableColumns() throws AnalysisException {
if (JobType.MV == jobType) {
return MTMVTask.SCHEMA;
} else if (JobType.INSERT == jobType) {
return InsertTask.SCHEMA;
} else {
throw new AnalysisException("Invalid job type: " + jobType.toString());
}
}
}

View File

@ -904,6 +904,8 @@ struct TMetadataTableRequestParams {
6: optional Types.TUserIdentity current_user_ident
7: optional PlanNodes.TQueriesMetadataParams queries_metadata_params
8: optional PlanNodes.TMaterializedViewsMetadataParams materialized_views_metadata_params
9: optional PlanNodes.TJobsMetadataParams jobs_metadata_params
10: optional PlanNodes.TTasksMetadataParams tasks_metadata_params
}
struct TFetchSchemaTableDataRequest {

View File

@ -475,12 +475,25 @@ struct TFrontendsMetadataParams {
struct TMaterializedViewsMetadataParams {
1: optional string database
2: optional Types.TUserIdentity current_user_ident
}
struct TJobsMetadataParams {
1: optional string type
2: optional Types.TUserIdentity current_user_ident
}
struct TTasksMetadataParams {
1: optional string type
2: optional Types.TUserIdentity current_user_ident
}
struct TQueriesMetadataParams {
1: optional string cluster_name
2: optional bool relay_to_other_fe
3: optional TMaterializedViewsMetadataParams materialized_views_params
4: optional TJobsMetadataParams jobs_params
5: optional TTasksMetadataParams tasks_params
}
struct TMetaScanRange {
@ -490,6 +503,8 @@ struct TMetaScanRange {
4: optional TFrontendsMetadataParams frontends_params
5: optional TQueriesMetadataParams queries_params
6: optional TMaterializedViewsMetadataParams materialized_views_params
7: optional TJobsMetadataParams jobs_params
8: optional TTasksMetadataParams tasks_params
}
// Specification of an individual data range which is held in its entirety

View File

@ -699,6 +699,8 @@ enum TMetadataType {
CATALOGS,
FRONTENDS_DISKS,
MATERIALIZED_VIEWS,
JOBS,
TASKS,
QUERIES,
}

View File

@ -842,12 +842,8 @@ class Suite implements GroovyInterceptable {
void waitingMTMVTaskFinished(String jobName) {
Thread.sleep(2000);
String showTasks = "SHOW MTMV JOB TASKS FOR ${jobName}"
List<List<String>> showTaskMetaResult = sql_meta(showTasks)
logger.info("showTaskMetaResult: " + showTaskMetaResult.toString())
int index = showTaskMetaResult.indexOf(['Status', 'CHAR'])
logger.info("index: " + index)
String status = "PENDING"
String showTasks = "select Status from tasks('type'='mv') where JobName = '${jobName}'"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
@ -855,7 +851,7 @@ class Suite implements GroovyInterceptable {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(index)
status = result.last().get(0)
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
@ -867,18 +863,14 @@ class Suite implements GroovyInterceptable {
}
String getJobName(String dbName, String mtmvName) {
String showMTMV = "select * from mtmvs('database'='${dbName}') where Name = '${mtmvName}'";
String showMTMV = "select JobName from mv_infos('database'='${dbName}') where Name = '${mtmvName}'";
logger.info(showMTMV)
List<List<String>> showTaskMetaResult = sql_meta(showMTMV)
logger.info("showTaskMetaResult: " + showTaskMetaResult.toString())
int index = showTaskMetaResult.indexOf(['JobName', 'TINYTEXT'])
logger.info("index: " + index)
List<List<Object>> result = sql(showMTMV)
logger.info("result: " + result.toString())
if (result.isEmpty()) {
Assert.fail();
}
return result.last().get(index);
return result.last().get(0);
}
}

View File

@ -48,18 +48,18 @@ suite("test_base_mtmv") {
SELECT * FROM ${tableName};
"""
def jobName = getJobName("regression_test_mtmv_p0", mvName);
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
sql """
REFRESH MATERIALIZED VIEW ${mvName}
"""
waitingMTMVTaskFinished(jobName)
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
// alter table
sql """
alter table ${tableName} add COLUMN new_col INT AFTER username;
"""
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
sql """
alter table ${tableName} drop COLUMN new_col;
"""
@ -67,13 +67,13 @@ suite("test_base_mtmv") {
REFRESH MATERIALIZED VIEW ${mvName}
"""
waitingMTMVTaskFinished(jobName)
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
// drop table
sql """
drop table ${tableName}
"""
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
sql """
CREATE TABLE IF NOT EXISTS `${tableName}` (
event_day DATE,
@ -89,7 +89,7 @@ suite("test_base_mtmv") {
REFRESH MATERIALIZED VIEW ${mvName}
"""
waitingMTMVTaskFinished(jobName)
order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'"
order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'"
sql """
DROP MATERIALIZED VIEW ${mvName}
"""