[Enhancement](Broker Load) New progress manager for showing loading progress status (#19170)
This work is in the early stage, current progress is not accurate because the scan range will be too large
for gathering information, what's more, only file scan node and import job support new progress manager
## How it works
for example, when we use the following load query:
```
LOAD LABEL test_broker_load
(
DATA INFILE("XXX")
INTO TABLE `XXX`
......
)
```
Initial Progress: the query will call `BrokerLoadJob` to create job, then `coordinator` is called to calculate scan range and its location.
Update Progress: BE will report runtime_state to FE and FE update progress status according to jobID and fragmentID
we can use `show load` to see the progress
PENDING:
```
State: PENDING
Progress: 0.00%
```
LOADING:
```
State: LOADING
Progress: 14.29% (1/7)
```
FINISH:
```
State: FINISHED
Progress: 100.00% (7/7)
```
At current time, full output of `show load\G` looks like:
```
*************************** 1. row ***************************
JobId: 25052
Label: test_broker
State: LOADING
Progress: 0.00% (0/7)
Type: BROKER
EtlInfo: NULL
TaskInfo: cluster:N/A; timeout(s):250000; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-05-03 20:53:13
EtlStartTime: 2023-05-03 20:53:15
EtlFinishTime: 2023-05-03 20:53:15
LoadStartTime: 2023-05-03 20:53:15
LoadFinishTime: NULL
URL: NULL
JobDetails: {"Unfinished backends":{"5a9a3ecd203049bc-85e39a765c043228":[10080]},"ScannedRows":39611808,"TaskNumber":1,"LoadBytes":7398908902,"All backends":{"5a9a3ecd203049bc-85e39a765c043228":[10080]},"FileNumber":1,"FileSize":7895697364}
TransactionId: 14015
ErrorTablets: {}
User: root
Comment:
```
## TODO:
1. The current partition granularity of scan range is too large, resulting in an uneven loading process for progress."
2. Only broker load supports the new Progress Manager, support progress for other query
This commit is contained in:
@ -157,6 +157,7 @@ import org.apache.doris.load.loadv2.LoadEtlChecker;
|
||||
import org.apache.doris.load.loadv2.LoadJobScheduler;
|
||||
import org.apache.doris.load.loadv2.LoadLoadingChecker;
|
||||
import org.apache.doris.load.loadv2.LoadManager;
|
||||
import org.apache.doris.load.loadv2.ProgressManager;
|
||||
import org.apache.doris.load.routineload.RoutineLoadManager;
|
||||
import org.apache.doris.load.routineload.RoutineLoadScheduler;
|
||||
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
|
||||
@ -312,6 +313,7 @@ public class Env {
|
||||
private GlobalFunctionMgr globalFunctionMgr;
|
||||
private Load load;
|
||||
private LoadManager loadManager;
|
||||
private ProgressManager progressManager;
|
||||
private StreamLoadRecordMgr streamLoadRecordMgr;
|
||||
private RoutineLoadManager routineLoadManager;
|
||||
private SqlBlockRuleMgr sqlBlockRuleMgr;
|
||||
@ -629,6 +631,7 @@ public class Env {
|
||||
|
||||
this.loadJobScheduler = new LoadJobScheduler();
|
||||
this.loadManager = new LoadManager(loadJobScheduler);
|
||||
this.progressManager = new ProgressManager();
|
||||
this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager",
|
||||
Config.fetch_stream_load_record_interval_second * 1000L);
|
||||
this.loadEtlChecker = new LoadEtlChecker(loadManager);
|
||||
@ -3561,6 +3564,14 @@ public class Env {
|
||||
return loadManager;
|
||||
}
|
||||
|
||||
public ProgressManager getProgressManager() {
|
||||
return progressManager;
|
||||
}
|
||||
|
||||
public static ProgressManager getCurrentProgressManager() {
|
||||
return getCurrentEnv().getProgressManager();
|
||||
}
|
||||
|
||||
public StreamLoadRecordMgr getStreamLoadRecordMgr() {
|
||||
return streamLoadRecordMgr;
|
||||
}
|
||||
|
||||
@ -190,6 +190,8 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
// divide job into broker loading task by table
|
||||
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
|
||||
this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true);
|
||||
ProgressManager progressManager = Env.getCurrentProgressManager();
|
||||
progressManager.registerProgressSimple(String.valueOf(id));
|
||||
MetaLockUtils.readLockTables(tableList);
|
||||
try {
|
||||
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry
|
||||
@ -215,7 +217,6 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
// use newLoadingTasks to save new created loading tasks and submit them later.
|
||||
newLoadingTasks.add(task);
|
||||
// load id will be added to loadStatistic when executing this task
|
||||
|
||||
// save all related tables and rollups in transaction state
|
||||
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
|
||||
.getTransactionState(dbId, transactionId);
|
||||
|
||||
@ -760,18 +760,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
jobInfo.add(state.name());
|
||||
|
||||
// progress
|
||||
// check null
|
||||
String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(id));
|
||||
switch (state) {
|
||||
case PENDING:
|
||||
jobInfo.add("ETL:0%; LOAD:0%");
|
||||
jobInfo.add("0%");
|
||||
break;
|
||||
case CANCELLED:
|
||||
jobInfo.add("ETL:N/A; LOAD:N/A");
|
||||
jobInfo.add(progress);
|
||||
break;
|
||||
case ETL:
|
||||
jobInfo.add("ETL:" + progress + "%; LOAD:0%");
|
||||
jobInfo.add(progress);
|
||||
break;
|
||||
default:
|
||||
jobInfo.add("ETL:100%; LOAD:" + progress + "%");
|
||||
jobInfo.add(progress);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,129 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* ProgressManager manage the progress of loading and exporting tasks
|
||||
*/
|
||||
public class ProgressManager {
|
||||
private static final Logger LOG = LogManager.getLogger(ProgressManager.class);
|
||||
|
||||
private Map<String, Progress> idToProgress = Maps.newConcurrentMap();
|
||||
|
||||
public void registerProgress(String id, int scannerNum) {
|
||||
LOG.debug("create {} with initial scannerNum {}", id, scannerNum);
|
||||
idToProgress.remove(id);
|
||||
idToProgress.put(id, new Progress(scannerNum));
|
||||
}
|
||||
|
||||
public void registerProgressSimple(String id) {
|
||||
registerProgress(id, 0);
|
||||
}
|
||||
|
||||
public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) {
|
||||
Progress progress = idToProgress.get(id);
|
||||
if (progress != null) {
|
||||
progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum);
|
||||
} else {
|
||||
LOG.warn("progress[" + id + "] missing meta information");
|
||||
}
|
||||
}
|
||||
|
||||
public void addTotalScanNums(String id, int num) {
|
||||
Progress progress = idToProgress.get(id);
|
||||
if (progress != null) {
|
||||
progress.addTotalScanNums(num);
|
||||
}
|
||||
}
|
||||
|
||||
public String getProgressInfo(String id) {
|
||||
String progressInfo = "Unknown id: " + id;
|
||||
Progress progress = idToProgress.get(id);
|
||||
if (progress != null) {
|
||||
int finish = progress.getFinishedScanNums();
|
||||
int total = progress.getTotalScanNums();
|
||||
String currentProgress = String.format("%.2f", progress.getProgress());
|
||||
progressInfo = currentProgress + "% (" + finish + "/" + total + ")";
|
||||
}
|
||||
return progressInfo;
|
||||
}
|
||||
|
||||
static class Progress {
|
||||
// one job have multiple query, and the query can be divided into
|
||||
// separate fragments. finished scan ranges reported from BE is bound
|
||||
// to the query, so we need to store them all to save status.
|
||||
// table: queryId -> fragmentId -> scan ranges
|
||||
private Table<TUniqueId, TUniqueId, Integer> finishedScanNums = HashBasedTable.create();
|
||||
private int totalScanNums = 0;
|
||||
|
||||
public synchronized void addTotalScanNums(int num) {
|
||||
totalScanNums += num;
|
||||
}
|
||||
|
||||
public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) {
|
||||
finishedScanNums.put(queryId, fragmentId, finishedScanNum);
|
||||
}
|
||||
|
||||
public int getTotalScanNums() {
|
||||
return totalScanNums;
|
||||
}
|
||||
|
||||
public int getFinishedScanNums() {
|
||||
int result = 0;
|
||||
for (Integer v : finishedScanNums.values()) {
|
||||
result += v;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public double getProgress() {
|
||||
// if no scan range found, the progress should be finished(100%)
|
||||
if (totalScanNums == 0) {
|
||||
return 100.0;
|
||||
}
|
||||
return getFinishedScanNums() * 100 / (double) totalScanNums;
|
||||
}
|
||||
|
||||
public Progress(int totalScanNums) {
|
||||
this.totalScanNums = totalScanNums;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Finished/Total: ");
|
||||
sb.append(getFinishedScanNums());
|
||||
sb.append("/");
|
||||
sb.append(totalScanNums);
|
||||
sb.append(" => ");
|
||||
sb.append(getProgress());
|
||||
sb.append("%");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -75,12 +75,15 @@ import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.LoadEtlTask;
|
||||
import org.apache.doris.thrift.PaloInternalServiceVersion;
|
||||
import org.apache.doris.thrift.TBrokerScanRange;
|
||||
import org.apache.doris.thrift.TDescriptorTable;
|
||||
import org.apache.doris.thrift.TErrorTabletInfo;
|
||||
import org.apache.doris.thrift.TEsScanRange;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPaloScanRange;
|
||||
import org.apache.doris.thrift.TPipelineFragmentParams;
|
||||
@ -96,6 +99,7 @@ import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
import org.apache.doris.thrift.TResourceLimit;
|
||||
import org.apache.doris.thrift.TRuntimeFilterParams;
|
||||
import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
import org.apache.doris.thrift.TScanRangeParams;
|
||||
@ -205,6 +209,7 @@ public class Coordinator {
|
||||
private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList();
|
||||
private ResultReceiver receiver;
|
||||
private final List<ScanNode> scanNodes;
|
||||
private int scanRangeNum = 0;
|
||||
// number of instances of this query, equals to
|
||||
// number of backends executing plan fragments on behalf of this query;
|
||||
// set in computeFragmentExecParams();
|
||||
@ -409,6 +414,10 @@ public class Coordinator {
|
||||
return queryId;
|
||||
}
|
||||
|
||||
public int getScanRangeNum() {
|
||||
return scanRangeNum;
|
||||
}
|
||||
|
||||
public void setQueryId(TUniqueId queryId) {
|
||||
this.queryId = queryId;
|
||||
}
|
||||
@ -606,6 +615,7 @@ public class Coordinator {
|
||||
List<Long> relatedBackendIds = Lists.newArrayList(addressToBackendID.values());
|
||||
Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds,
|
||||
relatedBackendIds);
|
||||
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum);
|
||||
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
|
||||
}
|
||||
executionProfile.markInstances(instanceIds);
|
||||
@ -1934,6 +1944,7 @@ public class Coordinator {
|
||||
TScanRangeParams scanRangeParams = new TScanRangeParams();
|
||||
scanRangeParams.scan_range = location.scan_range;
|
||||
scanRangeParamsList.add(scanRangeParams);
|
||||
updateScanRangeNumByScanRange(scanRangeParams);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2043,6 +2054,7 @@ public class Coordinator {
|
||||
scanRangeParams.scan_range = scanRangeLocations.scan_range;
|
||||
scanRangeParams.setVolumeId(minLocation.volume_id);
|
||||
scanRangeParamsList.add(scanRangeParams);
|
||||
updateScanRangeNumByScanRange(scanRangeParams);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2076,9 +2088,34 @@ public class Coordinator {
|
||||
// Volume is optional, so we need to set the value and the is-set bit
|
||||
scanRangeParams.setVolumeId(minLocation.volume_id);
|
||||
scanRangeParamsList.add(scanRangeParams);
|
||||
updateScanRangeNumByScanRange(scanRangeParams);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateScanRangeNumByScanRange(TScanRangeParams param) {
|
||||
TScanRange scanRange = param.getScanRange();
|
||||
if (scanRange == null) {
|
||||
return;
|
||||
}
|
||||
TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
|
||||
if (brokerScanRange != null) {
|
||||
scanRangeNum += brokerScanRange.getRanges().size();
|
||||
}
|
||||
TExternalScanRange externalScanRange = scanRange.getExtScanRange();
|
||||
if (externalScanRange != null) {
|
||||
TFileScanRange fileScanRange = externalScanRange.getFileScanRange();
|
||||
if (fileScanRange != null) {
|
||||
scanRangeNum += fileScanRange.getRanges().size();
|
||||
}
|
||||
}
|
||||
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
|
||||
if (paloScanRange != null) {
|
||||
scanRangeNum = scanRangeNum + 1;
|
||||
}
|
||||
// TODO: more ranges?
|
||||
}
|
||||
|
||||
// update job progress from BE
|
||||
public void updateFragmentExecStatus(TReportExecStatusParams params) {
|
||||
if (enablePipelineEngine) {
|
||||
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
|
||||
@ -2127,12 +2164,6 @@ public class Coordinator {
|
||||
}
|
||||
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
|
||||
}
|
||||
|
||||
if (params.isSetLoadedRows()) {
|
||||
Env.getCurrentEnv().getLoadManager().updateJobProgress(
|
||||
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
|
||||
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
|
||||
}
|
||||
} else {
|
||||
if (params.backend_num >= backendExecStates.size()) {
|
||||
LOG.warn("unknown backend number: {}, expected less than: {}",
|
||||
@ -2185,12 +2216,14 @@ public class Coordinator {
|
||||
}
|
||||
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
|
||||
}
|
||||
}
|
||||
|
||||
if (params.isSetLoadedRows()) {
|
||||
Env.getCurrentEnv().getLoadManager().updateJobProgress(
|
||||
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
|
||||
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
|
||||
}
|
||||
if (params.isSetLoadedRows()) {
|
||||
Env.getCurrentEnv().getLoadManager().updateJobProgress(
|
||||
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
|
||||
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
|
||||
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
|
||||
params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges());
|
||||
}
|
||||
}
|
||||
|
||||
@ -2419,6 +2452,7 @@ public class Coordinator {
|
||||
TScanRangeParams scanRangeParams = new TScanRangeParams();
|
||||
scanRangeParams.scan_range = location.scan_range;
|
||||
scanRangeParamsList.add(scanRangeParams);
|
||||
updateScanRangeNumByScanRange(scanRangeParams);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user