[Improvement](meta) support return total statistics of all databases for command show proc '/jobs (#17342)

currently, show proc jobs command can only used on a specific database,
if a user want to see overall data of the whole cluster, he has to look into every database and sum them up,
it's troublesome.
now he can achieve it simply by giving a -1 as dbId.

mysql> show proc '/jobs/-1';
+---------------+---------+---------+----------+-----------+-------+
| JobType | Pending | Running | Finished | Cancelled | Total |
+---------------+---------+---------+----------+-----------+-------+
| load | 0 | 0 | 0 | 2 | 2 |
| delete | 0 | 0 | 0 | 0 | 0 |
| rollup | 0 | 0 | 1 | 0 | 1 |
| schema_change | 0 | 0 | 2 | 0 | 2 |
| export | 0 | 0 | 0 | 3 | 3 |
+---------------+---------+---------+----------+-----------+-------+

mysql> show proc '/jobs/-1/rollup';
+----------+------------------+---------------------+---------------------+------------------+-----------------+----------+---------------+----------+------+----------+---------+
| JobId | TableName | CreateTime | FinishTime | BaseIndexName | RollupIndexName | RollupId | TransactionId | State | Msg | Progress | Timeout |
+----------+------------------+---------------------+---------------------+------------------+-----------------+----------+---------------+----------+------+----------+---------+
| 17826065 | order_detail | 2023-02-23 04:21:01 | 2023-02-23 04:21:22 | order_detail | rp1 | 17826066 | 6009 | FINISHED | | NULL | 2592000 |
+----------+------------------+---------------------+---------------------+------------------+-----------------+----------+---------------+----------+------+----------+---------+
1 row in set (0.01 sec)
This commit is contained in:
Yulei-Yang
2023-03-07 08:57:55 +08:00
committed by GitHub
parent 440cf526c8
commit 50bf02024a
16 changed files with 533 additions and 223 deletions

View File

@ -140,7 +140,16 @@ public abstract class AlterHandler extends MasterDaemon {
}
public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) {
return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count();
Long counter = 0L;
for (AlterJobV2 job : alterJobsV2.values()) {
// no need to check priv here. This method is only called in show proc stmt,
// which already check the ADMIN priv.
if (job.getJobState() == state) {
counter++;
}
}
return counter;
}
@Override

View File

@ -65,6 +65,7 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -1146,6 +1147,18 @@ public class MaterializedViewHandler extends AlterHandler {
return rollupJobInfos;
}
public List<List<Comparable>> getAllAlterJobInfos() {
List<List<Comparable>> rollupJobInfos = new LinkedList<List<Comparable>>();
for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) {
// no need to check priv here. This method is only called in show proc stmt,
// which already check the ADMIN priv.
alterJob.getInfo(rollupJobInfos);
}
return rollupJobInfos;
}
private void getAlterJobV2Infos(Database db, List<List<Comparable>> rollupJobInfos) {
ConnectContext ctx = ConnectContext.get();
for (AlterJobV2 alterJob : alterJobsV2.values()) {

View File

@ -1599,6 +1599,20 @@ public class SchemaChangeHandler extends AlterHandler {
});
}
public List<List<Comparable>> getAllAlterJobInfos() {
List<List<Comparable>> schemaChangeJobInfos = new LinkedList<>();
for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) {
// no need to check priv here. This method is only called in show proc stmt,
// which already check the ADMIN priv.
alterJob.getInfo(schemaChangeJobInfos);
}
// sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState"
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4, 5);
schemaChangeJobInfos.sort(comparator);
return schemaChangeJobInfos;
}
@Override
public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
List<List<Comparable>> schemaChangeJobInfos = new LinkedList<>();

View File

@ -51,14 +51,18 @@ public class ExportProcNode implements ProcNodeInterface {
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(exportMgr);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<String>> jobInfos = exportMgr.getExportJobInfosByIdOrState(
List<List<String>> jobInfos;
if (db == null) {
jobInfos = exportMgr.getExportJobInfos(LIMIT);
} else {
jobInfos = exportMgr.getExportJobInfosByIdOrState(
db.getId(), 0, "", false, null, null, LIMIT);
}
result.setRows(jobInfos);
return result;
}

View File

@ -60,7 +60,8 @@ public class JobsDbProcDir implements ProcDirInterface {
throw new AnalysisException("Invalid db id format: " + dbIdStr);
}
Database db = env.getInternalCatalog().getDbOrAnalysisException(dbId);
// dbId = -1 means need total result of all databases
Database db = dbId == -1 ? null : env.getInternalCatalog().getDbOrAnalysisException(dbId);
return new JobsProcDir(env, db);
}

View File

@ -24,7 +24,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadManager;
import com.google.common.base.Preconditions;
@ -68,9 +67,10 @@ public class JobsProcDir implements ProcDirInterface {
}
if (jobTypeName.equals(LOAD)) {
return new LoadProcDir(env.getLoadInstance(), db);
return new LoadProcDir(env.getCurrentEnv().getLoadManager(), db);
} else if (jobTypeName.equals(DELETE)) {
return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), db.getId());
Long dbId = db == null ? -1 : db.getId();
return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId);
} else if (jobTypeName.equals(ROLLUP)) {
return new RollupProcDir(env.getMaterializedViewHandler(), db);
} else if (jobTypeName.equals(SCHEMA_CHANGE)) {
@ -90,23 +90,22 @@ public class JobsProcDir implements ProcDirInterface {
result.setNames(TITLE_NAMES);
// db is null means need total result of all databases
if (db == null) {
return fetchResultForAllDbs();
}
long dbId = db.getId();
// load
Load load = Env.getCurrentEnv().getLoadInstance();
LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId);
Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId)
+ load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId);
Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId)
+ load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId);
Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId);
Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId));
Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId));
Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId));
Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId));
Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
cancelledNum.toString(), totalNum.toString()));
// delete
// TODO: find it from delete handler
@ -155,4 +154,66 @@ public class JobsProcDir implements ProcDirInterface {
return result;
}
public ProcResult fetchResultForAllDbs() {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
// load
LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING));
Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING));
Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED));
Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED));
Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
// delete
// TODO: find it from delete handler
pendingNum = 0L;
runningNum = 0L;
finishedNum = 0L;
cancelledNum = 0L;
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
// rollup
MaterializedViewHandler materializedViewHandler = Env.getCurrentEnv().getMaterializedViewHandler();
pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING);
runningNum = materializedViewHandler.getAlterJobV2Num(
org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN)
+ materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING);
finishedNum = materializedViewHandler.getAlterJobV2Num(
org.apache.doris.alter.AlterJobV2.JobState.FINISHED);
cancelledNum = materializedViewHandler.getAlterJobV2Num(
org.apache.doris.alter.AlterJobV2.JobState.CANCELLED);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(ROLLUP, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
// schema change
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
pendingNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING);
runningNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN)
+ schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING);
finishedNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.FINISHED);
cancelledNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.CANCELLED);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(SCHEMA_CHANGE, pendingNum.toString(), runningNum.toString(),
finishedNum.toString(), cancelledNum.toString(), totalNum.toString()));
// export
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING);
runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING);
finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED);
cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
return result;
}
}

View File

@ -18,13 +18,10 @@
package org.apache.doris.common.proc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadManager;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
public class LoadJobProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
@ -32,11 +29,11 @@ public class LoadJobProcNode implements ProcNodeInterface {
.add("PartitionId").add("LoadVersion")
.build();
private Load load;
private LoadManager loadManager;
private long jobId;
public LoadJobProcNode(Load load, long jobId) {
this.load = load;
public LoadJobProcNode(LoadManager loadManager, long jobId) {
this.loadManager = loadManager;
this.jobId = jobId;
}
@ -45,17 +42,7 @@ public class LoadJobProcNode implements ProcNodeInterface {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<Comparable>> infos = load.getLoadJobUnfinishedInfo(jobId);
// In this step, the detail of load job which is belongs to LoadManager will not be presented.
// The reason is that there are no detail info in load job which is streaming during loading.
// So it don't need to invoke the LoadManager here.
for (List<Comparable> info : infos) {
List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
for (Comparable element : info) {
oneInfo.add(element.toString());
}
result.addRow(oneInfo);
}
// TODO get results from LoadManager. Before do that, update implement of LoadManager:record detail info
return result;
}

View File

@ -18,16 +18,13 @@
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadManager;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
public class LoadProcDir implements ProcDirInterface {
@ -47,30 +44,30 @@ public class LoadProcDir implements ProcDirInterface {
private static final int LIMIT = 2000;
private Load load;
private LoadManager loadManager;
private Database db;
public LoadProcDir(Load load, Database db) {
this.load = load;
public LoadProcDir(LoadManager loadManager, Database db) {
this.loadManager = loadManager;
this.db = db;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(load);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
// merge load job from load and loadManager
LinkedList<List<Comparable>> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(),
null, false, null);
loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null,
false,
null));
List<List<Comparable>> loadJobInfos;
// db is null means need total result of all databases
if (db == null) {
loadJobInfos = loadManager.getAllLoadJobInfos();
} else {
loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null, false, null);
}
int counter = 0;
Iterator<List<Comparable>> iterator = loadJobInfos.descendingIterator();
Iterator<List<Comparable>> iterator = loadJobInfos.iterator();
while (iterator.hasNext()) {
List<Comparable> infoStr = iterator.next();
List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
@ -99,7 +96,7 @@ public class LoadProcDir implements ProcDirInterface {
throw new AnalysisException("Invalid job id format: " + jobIdStr);
}
return new LoadJobProcNode(load, jobId);
return new LoadJobProcNode(loadManager, jobId);
}
public static int analyzeColumn(String columnName) throws AnalysisException {

View File

@ -62,13 +62,18 @@ public class RollupProcDir implements ProcDirInterface {
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(materializedViewHandler);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<Comparable>> rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db);
List<List<Comparable>> rollupJobInfos;
// db is null means need total result of all databases
if (db == null) {
rollupJobInfos = materializedViewHandler.getAllAlterJobInfos();
} else {
rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db);
}
for (List<Comparable> infoStr : rollupJobInfos) {
List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
for (Comparable element : infoStr) {

View File

@ -176,13 +176,18 @@ public class SchemaChangeProcDir implements ProcDirInterface {
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(schemaChangeHandler);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<Comparable>> schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db);
List<List<Comparable>> schemaChangeJobInfos;
// db is null means need total result of all databases
if (db == null) {
schemaChangeJobInfos = schemaChangeHandler.getAllAlterJobInfos();
} else {
schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db);
}
for (List<Comparable> infoStr : schemaChangeJobInfos) {
List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
for (Comparable element : infoStr) {

View File

@ -833,7 +833,20 @@ public class DeleteHandler implements Writable {
}
String dbName = db.getFullName();
List<DeleteInfo> deleteInfoList = dbToDeleteInfos.get(dbId);
List<DeleteInfo> deleteInfoList = new ArrayList<>();
if (dbId == -1) {
for (Long tempDbId : dbToDeleteInfos.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId));
}
} else {
deleteInfoList = dbToDeleteInfos.get(dbId);
}
readLock();
try {

View File

@ -223,77 +223,16 @@ public class ExportMgr {
}
}
// check auth
TableName tableName = job.getTableName();
if (tableName == null || tableName.getTbl().equals("DUMMY")) {
// forward compatibility, no table name is saved before
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
db.getFullName(), PrivPredicate.SHOW)) {
continue;
}
} else {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
tableName.getDb(), tableName.getTbl(),
PrivPredicate.SHOW)) {
continue;
}
}
if (states != null) {
if (!states.contains(state)) {
continue;
}
}
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(id);
jobInfo.add(jobLabel);
jobInfo.add(state.name());
jobInfo.add(job.getProgress() + "%");
// task infos
Map<String, Object> infoMap = Maps.newHashMap();
List<String> partitions = job.getPartitions();
if (partitions == null) {
partitions = Lists.newArrayList();
partitions.add("*");
// check auth
if (isJobShowable(job)) {
exportJobInfos.add(composeExportJobInfo(job));
}
infoMap.put("db", job.getTableName().getDb());
infoMap.put("tbl", job.getTableName().getTbl());
if (job.getWhereExpr() != null) {
infoMap.put("where expr", job.getWhereExpr().toMySql());
}
infoMap.put("partitions", partitions);
infoMap.put("broker", job.getBrokerDesc().getName());
infoMap.put("column separator", job.getColumnSeparator());
infoMap.put("line delimiter", job.getLineDelimiter());
infoMap.put("exec mem limit", job.getExecMemLimit());
infoMap.put("columns", job.getColumns());
infoMap.put("coord num", job.getCoordList().size());
infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
jobInfo.add(new Gson().toJson(infoMap));
// path
jobInfo.add(job.getShowExportPath());
jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs()));
jobInfo.add(job.getTimeoutSecond());
// error msg
if (job.getState() == ExportJob.JobState.CANCELLED) {
ExportFailMsg failMsg = job.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
jobInfo.add(FeConstants.null_string);
}
exportJobInfos.add(jobInfo);
if (++counter >= resultNum) {
break;
@ -322,6 +261,112 @@ public class ExportMgr {
return results;
}
public List<List<String>> getExportJobInfos(long limit) {
long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
LinkedList<List<Comparable>> exportJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
int counter = 0;
for (ExportJob job : idToJob.values()) {
// check auth
if (isJobShowable(job)) {
exportJobInfos.add(composeExportJobInfo(job));
}
if (++counter >= resultNum) {
break;
}
}
} finally {
readUnlock();
}
// order by
ListComparator<List<Comparable>> comparator = null;
// sort by id asc
comparator = new ListComparator<List<Comparable>>(0);
Collections.sort(exportJobInfos, comparator);
List<List<String>> results = Lists.newArrayList();
for (List<Comparable> list : exportJobInfos) {
results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList()));
}
return results;
}
public boolean isJobShowable(ExportJob job) {
TableName tableName = job.getTableName();
if (tableName == null || tableName.getTbl().equals("DUMMY")) {
// forward compatibility, no table name is saved before
Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId());
if (db == null) {
return false;
}
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
db.getFullName(), PrivPredicate.SHOW)) {
return false;
}
} else {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
tableName.getDb(), tableName.getTbl(),
PrivPredicate.SHOW)) {
return false;
}
}
return true;
}
private List<Comparable> composeExportJobInfo(ExportJob job) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(job.getId());
jobInfo.add(job.getLabel());
jobInfo.add(job.getState().name());
jobInfo.add(job.getProgress() + "%");
// task infos
Map<String, Object> infoMap = Maps.newHashMap();
List<String> partitions = job.getPartitions();
if (partitions == null) {
partitions = Lists.newArrayList();
partitions.add("*");
}
infoMap.put("db", job.getTableName().getDb());
infoMap.put("tbl", job.getTableName().getTbl());
if (job.getWhereExpr() != null) {
infoMap.put("where expr", job.getWhereExpr().toMySql());
}
infoMap.put("partitions", partitions);
infoMap.put("broker", job.getBrokerDesc().getName());
infoMap.put("column separator", job.getColumnSeparator());
infoMap.put("line delimiter", job.getLineDelimiter());
infoMap.put("exec mem limit", job.getExecMemLimit());
infoMap.put("columns", job.getColumns());
infoMap.put("coord num", job.getCoordList().size());
infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
jobInfo.add(new Gson().toJson(infoMap));
// path
jobInfo.add(job.getShowExportPath());
jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs()));
jobInfo.add(job.getTimeoutSecond());
// error msg
if (job.getState() == ExportJob.JobState.CANCELLED) {
ExportFailMsg failMsg = job.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
jobInfo.add(FeConstants.null_string);
}
return jobInfo;
}
public void removeOldExportJobs() {
long currentTimeMs = System.currentTimeMillis();
@ -376,4 +421,25 @@ public class ExportMgr {
}
return size;
}
public long getJobNum(ExportJob.JobState state) {
int size = 0;
readLock();
try {
for (ExportJob job : idToJob.values()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
if (job.getState() == state) {
++size;
}
}
} finally {
readUnlock();
}
return size;
}
}

View File

@ -1331,6 +1331,31 @@ public class Load {
}
}
public long getLoadJobNum(JobState jobState) {
readLock();
try {
List<LoadJob> loadJobs = new ArrayList<>();
for (Long dbId : dbToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
loadJobs.addAll(this.dbToLoadJobs.get(dbId));
}
int jobNum = 0;
for (LoadJob job : loadJobs) {
if (job.getState() == jobState) {
++jobNum;
}
}
return jobNum;
} finally {
readUnlock();
}
}
public LoadJob getLoadJob(long jobId) {
readLock();
try {
@ -1340,6 +1365,151 @@ public class Load {
}
}
public LinkedList<List<Comparable>> getAllLoadJobInfos() {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
List<LoadJob> loadJobs = new ArrayList<>();
for (Long dbId : dbToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
loadJobs.addAll(this.dbToLoadJobs.get(dbId));
}
if (loadJobs.size() == 0) {
return loadJobInfos;
}
long start = System.currentTimeMillis();
LOG.debug("begin to get load job info, size: {}", loadJobs.size());
for (LoadJob loadJob : loadJobs) {
// filter first
String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName();
// check auth
Set<String> tableNames = loadJob.getTableNames();
boolean auth = true;
for (String tblName : tableNames) {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName,
tblName, PrivPredicate.LOAD)) {
auth = false;
break;
}
}
if (!auth) {
continue;
}
loadJobInfos.add(composeJobInfoByLoadJob(loadJob));
} // end for loadJobs
LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start));
} finally {
readUnlock();
}
return loadJobInfos;
}
private List<Comparable> composeJobInfoByLoadJob(LoadJob loadJob) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
// jobId
jobInfo.add(loadJob.getId());
// label
jobInfo.add(loadJob.getLabel());
// state
jobInfo.add(loadJob.getState().name());
// progress
switch (loadJob.getState()) {
case PENDING:
jobInfo.add("ETL:0%; LOAD:0%");
break;
case ETL:
jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%");
break;
case LOADING:
jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%");
break;
case QUORUM_FINISHED:
case FINISHED:
jobInfo.add("ETL:100%; LOAD:100%");
break;
case CANCELLED:
default:
jobInfo.add("ETL:N/A; LOAD:N/A");
break;
}
// type
jobInfo.add(loadJob.getEtlJobType().name());
// etl info
EtlStatus status = loadJob.getEtlJobStatus();
if (status == null || status.getState() == TEtlState.CANCELLED) {
jobInfo.add(FeConstants.null_string);
} else {
Map<String, String> counters = status.getCounters();
List<String> info = Lists.newArrayList();
for (String key : counters.keySet()) {
// XXX: internal etl job return all counters
if (key.equalsIgnoreCase("HDFS bytes read")
|| key.equalsIgnoreCase("Map input records")
|| key.startsWith("dpp.")
|| loadJob.getEtlJobType() == EtlJobType.MINI) {
info.add(key + "=" + counters.get(key));
}
} // end for counters
if (info.isEmpty()) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(StringUtils.join(info, "; "));
}
}
// task info
jobInfo.add("cluster:" + loadJob.getHadoopCluster()
+ "; timeout(s):" + loadJob.getTimeoutSecond()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
// error msg
if (loadJob.getState() == JobState.CANCELLED) {
FailMsg failMsg = loadJob.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
jobInfo.add(FeConstants.null_string);
}
// create time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs()));
// etl start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs()));
// etl end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs()));
// load start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs()));
// load end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs()));
// tracking url
jobInfo.add(status.getTrackingUrl());
// job detail(not used for hadoop load, just return an empty string)
jobInfo.add("");
// transaction id
jobInfo.add(loadJob.getTransactionId());
// error tablets(not used for hadoop load, just return an empty string)
jobInfo.add("");
// user
jobInfo.add(loadJob.getUser());
// comment
jobInfo.add(loadJob.getComment());
return jobInfo;
}
public LinkedList<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName, String labelValue,
boolean accurateMatch, Set<JobState> states) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
@ -1403,103 +1573,7 @@ public class Load {
}
}
List<Comparable> jobInfo = new ArrayList<Comparable>();
// jobId
jobInfo.add(loadJob.getId());
// label
jobInfo.add(label);
// state
jobInfo.add(state.name());
// progress
switch (loadJob.getState()) {
case PENDING:
jobInfo.add("ETL:0%; LOAD:0%");
break;
case ETL:
jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%");
break;
case LOADING:
jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%");
break;
case QUORUM_FINISHED:
jobInfo.add("ETL:100%; LOAD:100%");
break;
case FINISHED:
jobInfo.add("ETL:100%; LOAD:100%");
break;
case CANCELLED:
jobInfo.add("ETL:N/A; LOAD:N/A");
break;
default:
jobInfo.add("ETL:N/A; LOAD:N/A");
break;
}
// type
jobInfo.add(loadJob.getEtlJobType().name());
// etl info
EtlStatus status = loadJob.getEtlJobStatus();
if (status == null || status.getState() == TEtlState.CANCELLED) {
jobInfo.add(FeConstants.null_string);
} else {
Map<String, String> counters = status.getCounters();
List<String> info = Lists.newArrayList();
for (String key : counters.keySet()) {
// XXX: internal etl job return all counters
if (key.equalsIgnoreCase("HDFS bytes read")
|| key.equalsIgnoreCase("Map input records")
|| key.startsWith("dpp.")
|| loadJob.getEtlJobType() == EtlJobType.MINI) {
info.add(key + "=" + counters.get(key));
}
} // end for counters
if (info.isEmpty()) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(StringUtils.join(info, "; "));
}
}
// task info
jobInfo.add("cluster:" + loadJob.getHadoopCluster()
+ "; timeout(s):" + loadJob.getTimeoutSecond()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
// error msg
if (loadJob.getState() == JobState.CANCELLED) {
FailMsg failMsg = loadJob.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
jobInfo.add(FeConstants.null_string);
}
// create time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs()));
// etl start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs()));
// etl end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs()));
// load start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs()));
// load end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs()));
// tracking url
jobInfo.add(status.getTrackingUrl());
// job detail(not used for hadoop load, just return an empty string)
jobInfo.add("");
// transaction id
jobInfo.add(loadJob.getTransactionId());
// error tablets(not used for hadoop load, just return an empty string)
jobInfo.add("");
// user
jobInfo.add(loadJob.getUser());
// comment
jobInfo.add(loadJob.getComment());
loadJobInfos.add(jobInfo);
loadJobInfos.add(composeJobInfoByLoadJob(loadJob));
} // end for loadJobs
LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start));

View File

@ -43,7 +43,9 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.DatabaseTransactionMgr;
@ -66,6 +68,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -345,6 +348,30 @@ public class LoadManager implements Writable {
}
}
/**
* Get load job num, used by proc.
**/
public int getLoadJobNum(JobState jobState) {
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = new HashMap<>();
for (Long dbId : dbIdToLabelToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId));
}
List<LoadJob> loadJobList =
labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
} finally {
readUnlock();
}
}
/**
* Get load job num, used by metric.
@ -538,6 +565,40 @@ public class LoadManager implements Writable {
}
}
public List<List<Comparable>> getAllLoadJobInfos() {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = new HashMap<>();
for (Long dbId : dbIdToLabelToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId));
}
List<LoadJob> loadJobList = Lists.newArrayList();
loadJobList.addAll(
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
// check state
for (LoadJob loadJob : loadJobList) {
try {
// add load job info
loadJobInfos.add(loadJob.getShowInfo());
} catch (DdlException e) {
continue;
}
}
return loadJobInfos;
} finally {
readUnlock();
}
}
/**
* Get load job info.
**/