From 9edbfa37cd9119228a7897035110f4320f030845 Mon Sep 17 00:00:00 2001 From: Yusheng Xu <30896780+1330571@users.noreply.github.com> Date: Sat, 6 May 2023 22:44:40 +0800 Subject: [PATCH] [Enhancement](Broker Load) New progress manager for showing loading progress status (#19170) This work is in the early stage, current progress is not accurate because the scan range will be too large for gathering information, what's more, only file scan node and import job support new progress manager ## How it works for example, when we use the following load query: ``` LOAD LABEL test_broker_load ( DATA INFILE("XXX") INTO TABLE `XXX` ...... ) ``` Initial Progress: the query will call `BrokerLoadJob` to create job, then `coordinator` is called to calculate scan range and its location. Update Progress: BE will report runtime_state to FE and FE update progress status according to jobID and fragmentID we can use `show load` to see the progress PENDING: ``` State: PENDING Progress: 0.00% ``` LOADING: ``` State: LOADING Progress: 14.29% (1/7) ``` FINISH: ``` State: FINISHED Progress: 100.00% (7/7) ``` At current time, full output of `show load\G` looks like: ``` *************************** 1. row *************************** JobId: 25052 Label: test_broker State: LOADING Progress: 0.00% (0/7) Type: BROKER EtlInfo: NULL TaskInfo: cluster:N/A; timeout(s):250000; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2023-05-03 20:53:13 EtlStartTime: 2023-05-03 20:53:15 EtlFinishTime: 2023-05-03 20:53:15 LoadStartTime: 2023-05-03 20:53:15 LoadFinishTime: NULL URL: NULL JobDetails: {"Unfinished backends":{"5a9a3ecd203049bc-85e39a765c043228":[10080]},"ScannedRows":39611808,"TaskNumber":1,"LoadBytes":7398908902,"All backends":{"5a9a3ecd203049bc-85e39a765c043228":[10080]},"FileNumber":1,"FileSize":7895697364} TransactionId: 14015 ErrorTablets: {} User: root Comment: ``` ## TODO: 1. The current partition granularity of scan range is too large, resulting in an uneven loading process for progress." 2. Only broker load supports the new Progress Manager, support progress for other query --- be/src/runtime/fragment_mgr.cpp | 1 + be/src/runtime/runtime_state.cpp | 3 + be/src/runtime/runtime_state.h | 7 + be/src/vec/exec/scan/vfile_scanner.cpp | 5 + .../java/org/apache/doris/catalog/Env.java | 11 ++ .../doris/load/loadv2/BrokerLoadJob.java | 3 +- .../org/apache/doris/load/loadv2/LoadJob.java | 10 +- .../doris/load/loadv2/ProgressManager.java | 129 ++++++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 56 ++++++-- gensrc/thrift/FrontendService.thrift | 2 + 10 files changed, 211 insertions(+), 16 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7e59123c20..0cca7fa10f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -388,6 +388,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { exec_status.set_t_status(¶ms); params.__set_done(req.done); params.__set_query_type(req.runtime_state->query_type()); + params.__set_finished_scan_ranges(req.runtime_state->num_finished_range()); DCHECK(req.runtime_state != nullptr); if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 57fb9e8972..dd6ee00399 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -60,6 +60,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _load_job_id(-1), _normal_row_number(0), _error_row_number(0), @@ -86,6 +87,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), _error_log_file_path(""), @@ -115,6 +117,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), _error_log_file(nullptr) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f2b50bd92f..6def2cc4e5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -221,6 +221,8 @@ public: int64_t num_bytes_load_total() { return _num_bytes_load_total.load(); } + int64_t num_finished_range() { return _num_finished_scan_range.load(); } + int64_t num_rows_load_total() { return _num_rows_load_total.load(); } int64_t num_rows_load_filtered() { return _num_rows_load_filtered.load(); } @@ -239,6 +241,10 @@ public: _num_bytes_load_total.fetch_add(bytes_load); } + void update_num_finished_scan_range(int64_t finished_range) { + _num_finished_scan_range.fetch_add(finished_range); + } + void update_num_rows_load_filtered(int64_t num_rows) { _num_rows_load_filtered.fetch_add(num_rows); } @@ -466,6 +472,7 @@ private: std::atomic _num_print_error_rows; std::atomic _num_bytes_load_total; // total bytes read from source + std::atomic _num_finished_scan_range; std::vector _export_output_files; std::string _import_label; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index caf71684c2..d8db35adac 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -563,8 +563,13 @@ Status VFileScanner::_get_next_reader() { _src_block_init = false; if (_next_range >= _ranges.size()) { _scanner_eof = true; + _state->update_num_finished_scan_range(1); return Status::OK(); } + if (_next_range != 0) { + _state->update_num_finished_scan_range(1); + } + const TFileRangeDesc& range = _ranges[_next_range++]; // create reader for specific format diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 920be8a5ac..ee6b25096e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -157,6 +157,7 @@ import org.apache.doris.load.loadv2.LoadEtlChecker; import org.apache.doris.load.loadv2.LoadJobScheduler; import org.apache.doris.load.loadv2.LoadLoadingChecker; import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.loadv2.ProgressManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; @@ -312,6 +313,7 @@ public class Env { private GlobalFunctionMgr globalFunctionMgr; private Load load; private LoadManager loadManager; + private ProgressManager progressManager; private StreamLoadRecordMgr streamLoadRecordMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; @@ -629,6 +631,7 @@ public class Env { this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); + this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); this.loadEtlChecker = new LoadEtlChecker(loadManager); @@ -3561,6 +3564,14 @@ public class Env { return loadManager; } + public ProgressManager getProgressManager() { + return progressManager; + } + + public static ProgressManager getCurrentProgressManager() { + return getCurrentEnv().getProgressManager(); + } + public StreamLoadRecordMgr getStreamLoadRecordMgr() { return streamLoadRecordMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index bfdce1dc8f..666228ad4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -190,6 +190,8 @@ public class BrokerLoadJob extends BulkLoadJob { // divide job into broker loading task by table List newLoadingTasks = Lists.newArrayList(); this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true); + ProgressManager progressManager = Env.getCurrentProgressManager(); + progressManager.registerProgressSimple(String.valueOf(id)); MetaLockUtils.readLockTables(tableList); try { for (Map.Entry> entry @@ -215,7 +217,6 @@ public class BrokerLoadJob extends BulkLoadJob { // use newLoadingTasks to save new created loading tasks and submit them later. newLoadingTasks.add(task); // load id will be added to loadStatistic when executing this task - // save all related tables and rollups in transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(dbId, transactionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 9c1fb49566..1089e01059 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -760,18 +760,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobInfo.add(state.name()); // progress + // check null + String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(id)); switch (state) { case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); + jobInfo.add("0%"); break; case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); + jobInfo.add(progress); break; case ETL: - jobInfo.add("ETL:" + progress + "%; LOAD:0%"); + jobInfo.add(progress); break; default: - jobInfo.add("ETL:100%; LOAD:" + progress + "%"); + jobInfo.add(progress); break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java new file mode 100644 index 0000000000..2453af3788 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(ProgressManager.class); + + private Map idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(String id, int scannerNum) { + LOG.debug("create {} with initial scannerNum {}", id, scannerNum); + idToProgress.remove(id); + idToProgress.put(id, new Progress(scannerNum)); + } + + public void registerProgressSimple(String id) { + registerProgress(id, 0); + } + + public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum); + } else { + LOG.warn("progress[" + id + "] missing meta information"); + } + } + + public void addTotalScanNums(String id, int num) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.addTotalScanNums(num); + } + } + + public String getProgressInfo(String id) { + String progressInfo = "Unknown id: " + id; + Progress progress = idToProgress.get(id); + if (progress != null) { + int finish = progress.getFinishedScanNums(); + int total = progress.getTotalScanNums(); + String currentProgress = String.format("%.2f", progress.getProgress()); + progressInfo = currentProgress + "% (" + finish + "/" + total + ")"; + } + return progressInfo; + } + + static class Progress { + // one job have multiple query, and the query can be divided into + // separate fragments. finished scan ranges reported from BE is bound + // to the query, so we need to store them all to save status. + // table: queryId -> fragmentId -> scan ranges + private Table finishedScanNums = HashBasedTable.create(); + private int totalScanNums = 0; + + public synchronized void addTotalScanNums(int num) { + totalScanNums += num; + } + + public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) { + finishedScanNums.put(queryId, fragmentId, finishedScanNum); + } + + public int getTotalScanNums() { + return totalScanNums; + } + + public int getFinishedScanNums() { + int result = 0; + for (Integer v : finishedScanNums.values()) { + result += v; + } + return result; + } + + public double getProgress() { + // if no scan range found, the progress should be finished(100%) + if (totalScanNums == 0) { + return 100.0; + } + return getFinishedScanNums() * 100 / (double) totalScanNums; + } + + public Progress(int totalScanNums) { + this.totalScanNums = totalScanNums; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Finished/Total: "); + sb.append(getFinishedScanNums()); + sb.append("/"); + sb.append(totalScanNums); + sb.append(" => "); + sb.append(getProgress()); + sb.append("%"); + return sb.toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7becc8d35c..46c4a21758 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -75,12 +75,15 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.PaloInternalServiceVersion; +import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentParamsList; +import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -96,6 +99,7 @@ import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; +import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; @@ -205,6 +209,7 @@ public class Coordinator { private final List needCheckPipelineExecContexts = Lists.newArrayList(); private ResultReceiver receiver; private final List scanNodes; + private int scanRangeNum = 0; // number of instances of this query, equals to // number of backends executing plan fragments on behalf of this query; // set in computeFragmentExecParams(); @@ -409,6 +414,10 @@ public class Coordinator { return queryId; } + public int getScanRangeNum() { + return scanRangeNum; + } + public void setQueryId(TUniqueId queryId) { this.queryId = queryId; } @@ -606,6 +615,7 @@ public class Coordinator { List relatedBackendIds = Lists.newArrayList(addressToBackendID.values()); Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds, relatedBackendIds); + Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } executionProfile.markInstances(instanceIds); @@ -1934,6 +1944,7 @@ public class Coordinator { TScanRangeParams scanRangeParams = new TScanRangeParams(); scanRangeParams.scan_range = location.scan_range; scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } } @@ -2043,6 +2054,7 @@ public class Coordinator { scanRangeParams.scan_range = scanRangeLocations.scan_range; scanRangeParams.setVolumeId(minLocation.volume_id); scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } @@ -2076,9 +2088,34 @@ public class Coordinator { // Volume is optional, so we need to set the value and the is-set bit scanRangeParams.setVolumeId(minLocation.volume_id); scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } + private void updateScanRangeNumByScanRange(TScanRangeParams param) { + TScanRange scanRange = param.getScanRange(); + if (scanRange == null) { + return; + } + TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange(); + if (brokerScanRange != null) { + scanRangeNum += brokerScanRange.getRanges().size(); + } + TExternalScanRange externalScanRange = scanRange.getExtScanRange(); + if (externalScanRange != null) { + TFileScanRange fileScanRange = externalScanRange.getFileScanRange(); + if (fileScanRange != null) { + scanRangeNum += fileScanRange.getRanges().size(); + } + } + TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); + if (paloScanRange != null) { + scanRangeNum = scanRangeNum + 1; + } + // TODO: more ranges? + } + + // update job progress from BE public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); @@ -2127,12 +2164,6 @@ public class Coordinator { } executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } - - if (params.isSetLoadedRows()) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - } } else { if (params.backend_num >= backendExecStates.size()) { LOG.warn("unknown backend number: {}, expected less than: {}", @@ -2185,12 +2216,14 @@ public class Coordinator { } executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } + } - if (params.isSetLoadedRows()) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - } + if (params.isSetLoadedRows()) { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); } } @@ -2419,6 +2452,7 @@ public class Coordinator { TScanRangeParams scanRangeParams = new TScanRangeParams(); scanRangeParams.scan_range = location.scan_range; scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d2b13bbb43..002f952b0d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -433,6 +433,8 @@ struct TReportExecStatusParams { 20: optional PaloInternalService.TQueryType query_type 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile + + 22: optional i32 finished_scan_ranges } struct TFeResult {