diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index 6b555cbb07..f170da70e0 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ illustrate: 3. frontends: Display all FE node information in the cluster, including IP address, role, status, whether it is a mater, etc., equivalent to [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads: Display all routine load job information, including job name, status, etc. 5. auth: User name and corresponding permission information -6. jobs: +6. jobs: show statistics of all kind of jobs. If a specific `dbId` is given, will return statistics data of the database. If `dbId` is -1, will return total statistics data of all databases 7. bdbje: To view the bdbje database list, you need to modify the `fe.conf` file to add `enable_bdbje_debug_mode=true`, and then start `FE` through `sh start_fe.sh --daemon` to enter the `debug` mode. After entering `debug` mode, only `http server` and `MySQLServer` will be started and the `BDBJE` instance will be opened, but no metadata loading and subsequent startup processes will be entered. 8. dbs: Mainly used to view the metadata information of each database and the tables in the Doris cluster. This information includes table structure, partitions, materialized views, data shards and replicas, and more. Through this directory and its subdirectories, you can clearly display the table metadata in the cluster, and locate some problems such as data skew, replica failure, etc. 9. resources : View system resources, ordinary accounts can only see resources that they have USAGE_PRIV permission to use. Only the root and admin accounts can see all resources. Equivalent to [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index ba46b4b630..7ddee944ae 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ mysql> show proc "/"; 3. frontends :显示集群中所有的 FE 节点信息,包括IP地址、角色、状态、是否是mater等,等同于 [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads : 显示所有的 routine load 作业信息,包括作业名称、状态等 5. auth:用户名称及对应的权限信息 -6. jobs : +6. jobs :各类任务的统计信息,可查看指定数据库的 Job 的统计信息,如果 `dbId` = -1, 则返回所有库的汇总信息 7. bdbje:查看 bdbje 数据库列表,需要修改 `fe.conf` 文件增加 `enable_bdbje_debug_mode=true` , 然后通过 `sh start_fe.sh --daemon` 启动 `FE` 即可进入 `debug` 模式。 进入 `debug` 模式之后,仅会启动 `http server` 和 `MySQLServer` 并打开 `BDBJE` 实例,但不会进入任何元数据的加载及后续其他启动流程, 8. dbs : 主要用于查看 Doris 集群中各个数据库以及其中的表的元数据信息。这些信息包括表结构、分区、物化视图、数据分片和副本等等。通过这个目录和其子目录,可以清楚的展示集群中的表元数据情况,以及定位一些如数据倾斜、副本故障等问题 9. resources : 查看系统资源,普通账户只能看到自己有 USAGE_PRIV 使用权限的资源。只有root和admin账户可以看到所有的资源。等同于 [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index c408f9690d..80a0e33393 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -140,7 +140,16 @@ public abstract class AlterHandler extends MasterDaemon { } public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) { - return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count(); + Long counter = 0L; + + for (AlterJobV2 job : alterJobsV2.values()) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + if (job.getJobState() == state) { + counter++; + } + } + return counter; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 5a7ff739ca..f6fc944c92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -65,6 +65,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1146,6 +1147,18 @@ public class MaterializedViewHandler extends AlterHandler { return rollupJobInfos; } + public List> getAllAlterJobInfos() { + List> rollupJobInfos = new LinkedList>(); + + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + alterJob.getInfo(rollupJobInfos); + } + + return rollupJobInfos; + } + private void getAlterJobV2Infos(Database db, List> rollupJobInfos) { ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : alterJobsV2.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 11fb77c3ee..a2b4156e38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1599,6 +1599,20 @@ public class SchemaChangeHandler extends AlterHandler { }); } + public List> getAllAlterJobInfos() { + List> schemaChangeJobInfos = new LinkedList<>(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + alterJob.getInfo(schemaChangeJobInfos); + } + + // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState" + ListComparator> comparator = new ListComparator>(0, 1, 2, 3, 4, 5); + schemaChangeJobInfos.sort(comparator); + return schemaChangeJobInfos; + } + @Override public List> getAlterJobInfosByDb(Database db) { List> schemaChangeJobInfos = new LinkedList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java index 4c469a3039..1a154c8fd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java @@ -51,14 +51,18 @@ public class ExportProcNode implements ProcNodeInterface { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(exportMgr); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> jobInfos = exportMgr.getExportJobInfosByIdOrState( + List> jobInfos; + if (db == null) { + jobInfos = exportMgr.getExportJobInfos(LIMIT); + } else { + jobInfos = exportMgr.getExportJobInfosByIdOrState( db.getId(), 0, "", false, null, null, LIMIT); + } result.setRows(jobInfos); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java index 2957d7133e..414d6912d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java @@ -60,7 +60,8 @@ public class JobsDbProcDir implements ProcDirInterface { throw new AnalysisException("Invalid db id format: " + dbIdStr); } - Database db = env.getInternalCatalog().getDbOrAnalysisException(dbId); + // dbId = -1 means need total result of all databases + Database db = dbId == -1 ? null : env.getInternalCatalog().getDbOrAnalysisException(dbId); return new JobsProcDir(env, db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 3d93d31e7d..597066536b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; -import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadManager; import com.google.common.base.Preconditions; @@ -68,9 +67,10 @@ public class JobsProcDir implements ProcDirInterface { } if (jobTypeName.equals(LOAD)) { - return new LoadProcDir(env.getLoadInstance(), db); + return new LoadProcDir(env.getCurrentEnv().getLoadManager(), db); } else if (jobTypeName.equals(DELETE)) { - return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), db.getId()); + Long dbId = db == null ? -1 : db.getId(); + return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId); } else if (jobTypeName.equals(ROLLUP)) { return new RollupProcDir(env.getMaterializedViewHandler(), db); } else if (jobTypeName.equals(SCHEMA_CHANGE)) { @@ -90,23 +90,22 @@ public class JobsProcDir implements ProcDirInterface { result.setNames(TITLE_NAMES); + // db is null means need total result of all databases + if (db == null) { + return fetchResultForAllDbs(); + } + long dbId = db.getId(); + // load - Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); - Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId); - Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId); - Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId); - Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId)); Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), - cancelledNum.toString(), totalNum.toString())); + cancelledNum.toString(), totalNum.toString())); // delete // TODO: find it from delete handler @@ -155,4 +154,66 @@ public class JobsProcDir implements ProcDirInterface { return result; } + + public ProcResult fetchResultForAllDbs() { + BaseProcResult result = new BaseProcResult(); + + result.setNames(TITLE_NAMES); + // load + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED)); + Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // delete + // TODO: find it from delete handler + pendingNum = 0L; + runningNum = 0L; + finishedNum = 0L; + cancelledNum = 0L; + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // rollup + MaterializedViewHandler materializedViewHandler = Env.getCurrentEnv().getMaterializedViewHandler(); + pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(ROLLUP, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // schema change + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + pendingNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(SCHEMA_CHANGE, pendingNum.toString(), runningNum.toString(), + finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); + + // export + ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); + pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING); + runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING); + finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED); + cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java index 194349f2c6..4b3056ab6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java @@ -18,13 +18,10 @@ package org.apache.doris.common.proc; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; - public class LoadJobProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() @@ -32,11 +29,11 @@ public class LoadJobProcNode implements ProcNodeInterface { .add("PartitionId").add("LoadVersion") .build(); - private Load load; + private LoadManager loadManager; private long jobId; - public LoadJobProcNode(Load load, long jobId) { - this.load = load; + public LoadJobProcNode(LoadManager loadManager, long jobId) { + this.loadManager = loadManager; this.jobId = jobId; } @@ -45,17 +42,7 @@ public class LoadJobProcNode implements ProcNodeInterface { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> infos = load.getLoadJobUnfinishedInfo(jobId); - // In this step, the detail of load job which is belongs to LoadManager will not be presented. - // The reason is that there are no detail info in load job which is streaming during loading. - // So it don't need to invoke the LoadManager here. - for (List info : infos) { - List oneInfo = new ArrayList(TITLE_NAMES.size()); - for (Comparable element : info) { - oneInfo.add(element.toString()); - } - result.addRow(oneInfo); - } + // TODO get results from LoadManager. Before do that, update implement of LoadManager:record detail info return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index c2cd21d815..86cd01dcc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -18,16 +18,13 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; public class LoadProcDir implements ProcDirInterface { @@ -47,30 +44,30 @@ public class LoadProcDir implements ProcDirInterface { private static final int LIMIT = 2000; - private Load load; + private LoadManager loadManager; private Database db; - public LoadProcDir(Load load, Database db) { - this.load = load; + public LoadProcDir(LoadManager loadManager, Database db) { + this.loadManager = loadManager; this.db = db; } @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); - Preconditions.checkNotNull(load); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - // merge load job from load and loadManager - LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), - null, false, null); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, - false, - null)); + List> loadJobInfos; + + // db is null means need total result of all databases + if (db == null) { + loadJobInfos = loadManager.getAllLoadJobInfos(); + } else { + loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null, false, null); + } + int counter = 0; - Iterator> iterator = loadJobInfos.descendingIterator(); + Iterator> iterator = loadJobInfos.iterator(); while (iterator.hasNext()) { List infoStr = iterator.next(); List oneInfo = new ArrayList(TITLE_NAMES.size()); @@ -99,7 +96,7 @@ public class LoadProcDir implements ProcDirInterface { throw new AnalysisException("Invalid job id format: " + jobIdStr); } - return new LoadJobProcNode(load, jobId); + return new LoadJobProcNode(loadManager, jobId); } public static int analyzeColumn(String columnName) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index fb65f5231c..896d6349be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -62,13 +62,18 @@ public class RollupProcDir implements ProcDirInterface { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(materializedViewHandler); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + List> rollupJobInfos; + // db is null means need total result of all databases + if (db == null) { + rollupJobInfos = materializedViewHandler.getAllAlterJobInfos(); + } else { + rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + } for (List infoStr : rollupJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java index 6bf05a02ae..d89792ddf5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java @@ -176,13 +176,18 @@ public class SchemaChangeProcDir implements ProcDirInterface { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(schemaChangeHandler); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + List> schemaChangeJobInfos; + // db is null means need total result of all databases + if (db == null) { + schemaChangeJobInfos = schemaChangeHandler.getAllAlterJobInfos(); + } else { + schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + } for (List infoStr : schemaChangeJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 3e8d6c6318..bd793e0808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -833,7 +833,20 @@ public class DeleteHandler implements Writable { } String dbName = db.getFullName(); - List deleteInfoList = dbToDeleteInfos.get(dbId); + List deleteInfoList = new ArrayList<>(); + if (dbId == -1) { + for (Long tempDbId : dbToDeleteInfos.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId)); + } + } else { + deleteInfoList = dbToDeleteInfos.get(dbId); + } readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a6862a9b2b..a56337ce9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -223,77 +223,16 @@ public class ExportMgr { } } - // check auth - TableName tableName = job.getTableName(); - if (tableName == null || tableName.getTbl().equals("DUMMY")) { - // forward compatibility, no table name is saved before - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - continue; - } - if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), - db.getFullName(), PrivPredicate.SHOW)) { - continue; - } - } else { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), - tableName.getDb(), tableName.getTbl(), - PrivPredicate.SHOW)) { - continue; - } - } - if (states != null) { if (!states.contains(state)) { continue; } } - List jobInfo = new ArrayList(); - - jobInfo.add(id); - jobInfo.add(jobLabel); - jobInfo.add(state.name()); - jobInfo.add(job.getProgress() + "%"); - - // task infos - Map infoMap = Maps.newHashMap(); - List partitions = job.getPartitions(); - if (partitions == null) { - partitions = Lists.newArrayList(); - partitions.add("*"); + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); } - infoMap.put("db", job.getTableName().getDb()); - infoMap.put("tbl", job.getTableName().getTbl()); - if (job.getWhereExpr() != null) { - infoMap.put("where expr", job.getWhereExpr().toMySql()); - } - infoMap.put("partitions", partitions); - infoMap.put("broker", job.getBrokerDesc().getName()); - infoMap.put("column separator", job.getColumnSeparator()); - infoMap.put("line delimiter", job.getLineDelimiter()); - infoMap.put("exec mem limit", job.getExecMemLimit()); - infoMap.put("columns", job.getColumns()); - infoMap.put("coord num", job.getCoordList().size()); - infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); - jobInfo.add(new Gson().toJson(infoMap)); - // path - jobInfo.add(job.getShowExportPath()); - - jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); - jobInfo.add(job.getTimeoutSecond()); - - // error msg - if (job.getState() == ExportJob.JobState.CANCELLED) { - ExportFailMsg failMsg = job.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); - } - - exportJobInfos.add(jobInfo); if (++counter >= resultNum) { break; @@ -322,6 +261,112 @@ public class ExportMgr { return results; } + public List> getExportJobInfos(long limit) { + long resultNum = limit == -1L ? Integer.MAX_VALUE : limit; + LinkedList> exportJobInfos = new LinkedList>(); + + readLock(); + try { + int counter = 0; + for (ExportJob job : idToJob.values()) { + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); + } + + if (++counter >= resultNum) { + break; + } + } + } finally { + readUnlock(); + } + + // order by + ListComparator> comparator = null; + // sort by id asc + comparator = new ListComparator>(0); + Collections.sort(exportJobInfos, comparator); + + List> results = Lists.newArrayList(); + for (List list : exportJobInfos) { + results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList())); + } + + return results; + } + + public boolean isJobShowable(ExportJob job) { + TableName tableName = job.getTableName(); + if (tableName == null || tableName.getTbl().equals("DUMMY")) { + // forward compatibility, no table name is saved before + Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId()); + if (db == null) { + return false; + } + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + db.getFullName(), PrivPredicate.SHOW)) { + return false; + } + } else { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), + tableName.getDb(), tableName.getTbl(), + PrivPredicate.SHOW)) { + return false; + } + } + + return true; + } + + private List composeExportJobInfo(ExportJob job) { + List jobInfo = new ArrayList(); + + jobInfo.add(job.getId()); + jobInfo.add(job.getLabel()); + jobInfo.add(job.getState().name()); + jobInfo.add(job.getProgress() + "%"); + + // task infos + Map infoMap = Maps.newHashMap(); + List partitions = job.getPartitions(); + if (partitions == null) { + partitions = Lists.newArrayList(); + partitions.add("*"); + } + infoMap.put("db", job.getTableName().getDb()); + infoMap.put("tbl", job.getTableName().getTbl()); + if (job.getWhereExpr() != null) { + infoMap.put("where expr", job.getWhereExpr().toMySql()); + } + infoMap.put("partitions", partitions); + infoMap.put("broker", job.getBrokerDesc().getName()); + infoMap.put("column separator", job.getColumnSeparator()); + infoMap.put("line delimiter", job.getLineDelimiter()); + infoMap.put("exec mem limit", job.getExecMemLimit()); + infoMap.put("columns", job.getColumns()); + infoMap.put("coord num", job.getCoordList().size()); + infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + jobInfo.add(new Gson().toJson(infoMap)); + // path + jobInfo.add(job.getShowExportPath()); + + jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); + jobInfo.add(job.getTimeoutSecond()); + + // error msg + if (job.getState() == ExportJob.JobState.CANCELLED) { + ExportFailMsg failMsg = job.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + return jobInfo; + } + public void removeOldExportJobs() { long currentTimeMs = System.currentTimeMillis(); @@ -376,4 +421,25 @@ public class ExportMgr { } return size; } + + public long getJobNum(ExportJob.JobState state) { + int size = 0; + readLock(); + try { + for (ExportJob job : idToJob.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + if (job.getState() == state) { + ++size; + } + } + } finally { + readUnlock(); + } + return size; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index c6d08b3054..25a17f5865 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1331,6 +1331,31 @@ public class Load { } } + public long getLoadJobNum(JobState jobState) { + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + + int jobNum = 0; + for (LoadJob job : loadJobs) { + if (job.getState() == jobState) { + ++jobNum; + } + } + return jobNum; + } finally { + readUnlock(); + } + } + public LoadJob getLoadJob(long jobId) { readLock(); try { @@ -1340,6 +1365,151 @@ public class Load { } } + public LinkedList> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + if (loadJobs.size() == 0) { + return loadJobInfos; + } + + long start = System.currentTimeMillis(); + LOG.debug("begin to get load job info, size: {}", loadJobs.size()); + + for (LoadJob loadJob : loadJobs) { + // filter first + String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName(); + // check auth + Set tableNames = loadJob.getTableNames(); + boolean auth = true; + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName, + tblName, PrivPredicate.LOAD)) { + auth = false; + break; + } + } + if (!auth) { + continue; + } + + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); + } // end for loadJobs + + LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); + } finally { + readUnlock(); + } + + return loadJobInfos; + } + + private List composeJobInfoByLoadJob(LoadJob loadJob) { + List jobInfo = new ArrayList(); + + // jobId + jobInfo.add(loadJob.getId()); + // label + jobInfo.add(loadJob.getLabel()); + // state + jobInfo.add(loadJob.getState().name()); + + // progress + switch (loadJob.getState()) { + case PENDING: + jobInfo.add("ETL:0%; LOAD:0%"); + break; + case ETL: + jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); + break; + case LOADING: + jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); + break; + case QUORUM_FINISHED: + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case CANCELLED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + + // type + jobInfo.add(loadJob.getEtlJobType().name()); + + // etl info + EtlStatus status = loadJob.getEtlJobStatus(); + if (status == null || status.getState() == TEtlState.CANCELLED) { + jobInfo.add(FeConstants.null_string); + } else { + Map counters = status.getCounters(); + List info = Lists.newArrayList(); + for (String key : counters.keySet()) { + // XXX: internal etl job return all counters + if (key.equalsIgnoreCase("HDFS bytes read") + || key.equalsIgnoreCase("Map input records") + || key.startsWith("dpp.") + || loadJob.getEtlJobType() == EtlJobType.MINI) { + info.add(key + "=" + counters.get(key)); + } + } // end for counters + if (info.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(StringUtils.join(info, "; ")); + } + } + + // task info + jobInfo.add("cluster:" + loadJob.getHadoopCluster() + + "; timeout(s):" + loadJob.getTimeoutSecond() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); + + // error msg + if (loadJob.getState() == JobState.CANCELLED) { + FailMsg failMsg = loadJob.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); + // tracking url + jobInfo.add(status.getTrackingUrl()); + // job detail(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // transaction id + jobInfo.add(loadJob.getTransactionId()); + // error tablets(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // user + jobInfo.add(loadJob.getUser()); + // comment + jobInfo.add(loadJob.getComment()); + + return jobInfo; + } + public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, boolean accurateMatch, Set states) throws AnalysisException { LinkedList> loadJobInfos = new LinkedList>(); @@ -1403,103 +1573,7 @@ public class Load { } } - List jobInfo = new ArrayList(); - - // jobId - jobInfo.add(loadJob.getId()); - // label - jobInfo.add(label); - // state - jobInfo.add(state.name()); - - // progress - switch (loadJob.getState()) { - case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); - break; - case ETL: - jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); - break; - case LOADING: - jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); - break; - case QUORUM_FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - default: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - } - - // type - jobInfo.add(loadJob.getEtlJobType().name()); - - // etl info - EtlStatus status = loadJob.getEtlJobStatus(); - if (status == null || status.getState() == TEtlState.CANCELLED) { - jobInfo.add(FeConstants.null_string); - } else { - Map counters = status.getCounters(); - List info = Lists.newArrayList(); - for (String key : counters.keySet()) { - // XXX: internal etl job return all counters - if (key.equalsIgnoreCase("HDFS bytes read") - || key.equalsIgnoreCase("Map input records") - || key.startsWith("dpp.") - || loadJob.getEtlJobType() == EtlJobType.MINI) { - info.add(key + "=" + counters.get(key)); - } - } // end for counters - if (info.isEmpty()) { - jobInfo.add(FeConstants.null_string); - } else { - jobInfo.add(StringUtils.join(info, "; ")); - } - } - - // task info - jobInfo.add("cluster:" + loadJob.getHadoopCluster() - + "; timeout(s):" + loadJob.getTimeoutSecond() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); - - // error msg - if (loadJob.getState() == JobState.CANCELLED) { - FailMsg failMsg = loadJob.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); - } - - // create time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); - // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); - // etl end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); - // load start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); - // load end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); - // tracking url - jobInfo.add(status.getTrackingUrl()); - // job detail(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // transaction id - jobInfo.add(loadJob.getTransactionId()); - // error tablets(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // user - jobInfo.add(loadJob.getUser()); - // comment - jobInfo.add(loadJob.getComment()); - - loadJobInfos.add(jobInfo); + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); } // end for loadJobs LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1eb5d24a75..2457dacdca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -43,7 +43,9 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; @@ -66,6 +68,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -345,6 +348,30 @@ public class LoadManager implements Writable { } } + /** + * Get load job num, used by proc. + **/ + public int getLoadJobNum(JobState jobState) { + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + + List loadJobList = + labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList()); + return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count(); + } finally { + readUnlock(); + } + } /** * Get load job num, used by metric. @@ -538,6 +565,40 @@ public class LoadManager implements Writable { } } + public List> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + List loadJobList = Lists.newArrayList(); + loadJobList.addAll( + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); + + // check state + for (LoadJob loadJob : loadJobList) { + try { + // add load job info + loadJobInfos.add(loadJob.getShowInfo()); + } catch (DdlException e) { + continue; + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + /** * Get load job info. **/