diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7e59123c20..0cca7fa10f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -388,6 +388,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { exec_status.set_t_status(¶ms); params.__set_done(req.done); params.__set_query_type(req.runtime_state->query_type()); + params.__set_finished_scan_ranges(req.runtime_state->num_finished_range()); DCHECK(req.runtime_state != nullptr); if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 57fb9e8972..dd6ee00399 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -60,6 +60,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _load_job_id(-1), _normal_row_number(0), _error_row_number(0), @@ -86,6 +87,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), _error_log_file_path(""), @@ -115,6 +117,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, _num_rows_load_unselected(0), _num_print_error_rows(0), _num_bytes_load_total(0), + _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), _error_log_file(nullptr) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f2b50bd92f..6def2cc4e5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -221,6 +221,8 @@ public: int64_t num_bytes_load_total() { return _num_bytes_load_total.load(); } + int64_t num_finished_range() { return _num_finished_scan_range.load(); } + int64_t num_rows_load_total() { return _num_rows_load_total.load(); } int64_t num_rows_load_filtered() { return _num_rows_load_filtered.load(); } @@ -239,6 +241,10 @@ public: _num_bytes_load_total.fetch_add(bytes_load); } + void update_num_finished_scan_range(int64_t finished_range) { + _num_finished_scan_range.fetch_add(finished_range); + } + void update_num_rows_load_filtered(int64_t num_rows) { _num_rows_load_filtered.fetch_add(num_rows); } @@ -466,6 +472,7 @@ private: std::atomic _num_print_error_rows; std::atomic _num_bytes_load_total; // total bytes read from source + std::atomic _num_finished_scan_range; std::vector _export_output_files; std::string _import_label; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index caf71684c2..d8db35adac 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -563,8 +563,13 @@ Status VFileScanner::_get_next_reader() { _src_block_init = false; if (_next_range >= _ranges.size()) { _scanner_eof = true; + _state->update_num_finished_scan_range(1); return Status::OK(); } + if (_next_range != 0) { + _state->update_num_finished_scan_range(1); + } + const TFileRangeDesc& range = _ranges[_next_range++]; // create reader for specific format diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 920be8a5ac..ee6b25096e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -157,6 +157,7 @@ import org.apache.doris.load.loadv2.LoadEtlChecker; import org.apache.doris.load.loadv2.LoadJobScheduler; import org.apache.doris.load.loadv2.LoadLoadingChecker; import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.loadv2.ProgressManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; @@ -312,6 +313,7 @@ public class Env { private GlobalFunctionMgr globalFunctionMgr; private Load load; private LoadManager loadManager; + private ProgressManager progressManager; private StreamLoadRecordMgr streamLoadRecordMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; @@ -629,6 +631,7 @@ public class Env { this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); + this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); this.loadEtlChecker = new LoadEtlChecker(loadManager); @@ -3561,6 +3564,14 @@ public class Env { return loadManager; } + public ProgressManager getProgressManager() { + return progressManager; + } + + public static ProgressManager getCurrentProgressManager() { + return getCurrentEnv().getProgressManager(); + } + public StreamLoadRecordMgr getStreamLoadRecordMgr() { return streamLoadRecordMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index bfdce1dc8f..666228ad4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -190,6 +190,8 @@ public class BrokerLoadJob extends BulkLoadJob { // divide job into broker loading task by table List newLoadingTasks = Lists.newArrayList(); this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true); + ProgressManager progressManager = Env.getCurrentProgressManager(); + progressManager.registerProgressSimple(String.valueOf(id)); MetaLockUtils.readLockTables(tableList); try { for (Map.Entry> entry @@ -215,7 +217,6 @@ public class BrokerLoadJob extends BulkLoadJob { // use newLoadingTasks to save new created loading tasks and submit them later. newLoadingTasks.add(task); // load id will be added to loadStatistic when executing this task - // save all related tables and rollups in transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(dbId, transactionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 9c1fb49566..1089e01059 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -760,18 +760,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobInfo.add(state.name()); // progress + // check null + String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(id)); switch (state) { case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); + jobInfo.add("0%"); break; case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); + jobInfo.add(progress); break; case ETL: - jobInfo.add("ETL:" + progress + "%; LOAD:0%"); + jobInfo.add(progress); break; default: - jobInfo.add("ETL:100%; LOAD:" + progress + "%"); + jobInfo.add(progress); break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java new file mode 100644 index 0000000000..2453af3788 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(ProgressManager.class); + + private Map idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(String id, int scannerNum) { + LOG.debug("create {} with initial scannerNum {}", id, scannerNum); + idToProgress.remove(id); + idToProgress.put(id, new Progress(scannerNum)); + } + + public void registerProgressSimple(String id) { + registerProgress(id, 0); + } + + public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum); + } else { + LOG.warn("progress[" + id + "] missing meta information"); + } + } + + public void addTotalScanNums(String id, int num) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.addTotalScanNums(num); + } + } + + public String getProgressInfo(String id) { + String progressInfo = "Unknown id: " + id; + Progress progress = idToProgress.get(id); + if (progress != null) { + int finish = progress.getFinishedScanNums(); + int total = progress.getTotalScanNums(); + String currentProgress = String.format("%.2f", progress.getProgress()); + progressInfo = currentProgress + "% (" + finish + "/" + total + ")"; + } + return progressInfo; + } + + static class Progress { + // one job have multiple query, and the query can be divided into + // separate fragments. finished scan ranges reported from BE is bound + // to the query, so we need to store them all to save status. + // table: queryId -> fragmentId -> scan ranges + private Table finishedScanNums = HashBasedTable.create(); + private int totalScanNums = 0; + + public synchronized void addTotalScanNums(int num) { + totalScanNums += num; + } + + public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) { + finishedScanNums.put(queryId, fragmentId, finishedScanNum); + } + + public int getTotalScanNums() { + return totalScanNums; + } + + public int getFinishedScanNums() { + int result = 0; + for (Integer v : finishedScanNums.values()) { + result += v; + } + return result; + } + + public double getProgress() { + // if no scan range found, the progress should be finished(100%) + if (totalScanNums == 0) { + return 100.0; + } + return getFinishedScanNums() * 100 / (double) totalScanNums; + } + + public Progress(int totalScanNums) { + this.totalScanNums = totalScanNums; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Finished/Total: "); + sb.append(getFinishedScanNums()); + sb.append("/"); + sb.append(totalScanNums); + sb.append(" => "); + sb.append(getProgress()); + sb.append("%"); + return sb.toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7becc8d35c..46c4a21758 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -75,12 +75,15 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.PaloInternalServiceVersion; +import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentParamsList; +import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -96,6 +99,7 @@ import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; +import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; @@ -205,6 +209,7 @@ public class Coordinator { private final List needCheckPipelineExecContexts = Lists.newArrayList(); private ResultReceiver receiver; private final List scanNodes; + private int scanRangeNum = 0; // number of instances of this query, equals to // number of backends executing plan fragments on behalf of this query; // set in computeFragmentExecParams(); @@ -409,6 +414,10 @@ public class Coordinator { return queryId; } + public int getScanRangeNum() { + return scanRangeNum; + } + public void setQueryId(TUniqueId queryId) { this.queryId = queryId; } @@ -606,6 +615,7 @@ public class Coordinator { List relatedBackendIds = Lists.newArrayList(addressToBackendID.values()); Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds, relatedBackendIds); + Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } executionProfile.markInstances(instanceIds); @@ -1934,6 +1944,7 @@ public class Coordinator { TScanRangeParams scanRangeParams = new TScanRangeParams(); scanRangeParams.scan_range = location.scan_range; scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } } @@ -2043,6 +2054,7 @@ public class Coordinator { scanRangeParams.scan_range = scanRangeLocations.scan_range; scanRangeParams.setVolumeId(minLocation.volume_id); scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } @@ -2076,9 +2088,34 @@ public class Coordinator { // Volume is optional, so we need to set the value and the is-set bit scanRangeParams.setVolumeId(minLocation.volume_id); scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } + private void updateScanRangeNumByScanRange(TScanRangeParams param) { + TScanRange scanRange = param.getScanRange(); + if (scanRange == null) { + return; + } + TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange(); + if (brokerScanRange != null) { + scanRangeNum += brokerScanRange.getRanges().size(); + } + TExternalScanRange externalScanRange = scanRange.getExtScanRange(); + if (externalScanRange != null) { + TFileScanRange fileScanRange = externalScanRange.getFileScanRange(); + if (fileScanRange != null) { + scanRangeNum += fileScanRange.getRanges().size(); + } + } + TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); + if (paloScanRange != null) { + scanRangeNum = scanRangeNum + 1; + } + // TODO: more ranges? + } + + // update job progress from BE public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); @@ -2127,12 +2164,6 @@ public class Coordinator { } executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } - - if (params.isSetLoadedRows()) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - } } else { if (params.backend_num >= backendExecStates.size()) { LOG.warn("unknown backend number: {}, expected less than: {}", @@ -2185,12 +2216,14 @@ public class Coordinator { } executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } + } - if (params.isSetLoadedRows()) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - } + if (params.isSetLoadedRows()) { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); } } @@ -2419,6 +2452,7 @@ public class Coordinator { TScanRangeParams scanRangeParams = new TScanRangeParams(); scanRangeParams.scan_range = location.scan_range; scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d2b13bbb43..002f952b0d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -433,6 +433,8 @@ struct TReportExecStatusParams { 20: optional PaloInternalService.TQueryType query_type 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile + + 22: optional i32 finished_scan_ranges } struct TFeResult {