From e686e85f273da5475886ca159893834d0e079bd0 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Mon, 1 Jul 2024 22:05:25 +0800 Subject: [PATCH] [opt](split) add max wait time of getting splits (#36842) bp: #36843 --- be/src/pipeline/exec/file_scan_operator.cpp | 3 ++- be/src/vec/exec/scan/new_file_scan_node.cpp | 3 ++- .../vec/exec/scan/split_source_connector.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.h | 15 +++++++-------- be/src/vec/exec/scan/vfile_scanner.cpp | 3 --- be/src/vec/exec/scan/vfile_scanner.h | 1 - .../doris/datasource/FileQueryScanNode.java | 3 ++- .../apache/doris/datasource/SplitSource.java | 18 ++++++++++-------- .../org/apache/doris/qe/SessionVariable.java | 17 +++++++++++++++++ 9 files changed, 41 insertions(+), 24 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 9b2eeb9b28..6182d35b97 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -71,8 +71,9 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; if (scan_range.__isset.split_source) { auto split_source = scan_range.split_source; + RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared( - state, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits); } } if (_split_source == nullptr) { diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index d0b0f3cce9..33ed88f6cd 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -74,8 +74,9 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state, auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; if (scan_range.__isset.split_source) { auto split_source = scan_range.split_source; + RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared( - state, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits); } } if (_split_source == nullptr) { diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index fae65543e5..9bba44b4e7 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -45,7 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang std::lock_guard l(_range_lock); *has_next = false; if (_scan_index == _scan_ranges.size() && !_last_batch) { - SCOPED_RAW_TIMER(&_get_split_timer); + SCOPED_TIMER(_get_split_timer); Status coord_status; FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(), _state->get_query_ctx()->coord_addr, &coord_status); diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index bfda961df3..f62b45612b 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,8 +43,6 @@ public: virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; - - virtual int64_t get_split_time() { return 0; } }; /** @@ -89,6 +87,7 @@ class RemoteSplitSourceConnector : public SplitSourceConnector { private: std::mutex _range_lock; RuntimeState* _state; + RuntimeProfile::Counter* _get_split_timer; int64 _split_source_id; int _num_splits; @@ -97,11 +96,13 @@ private: int _scan_index = 0; int _range_index = 0; - int64_t _get_split_timer = 0; - public: - RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits) - : _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {} + RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer, + int64 split_source_id, int num_splits) + : _state(state), + _get_split_timer(get_split_timer), + _split_source_id(split_source_id), + _num_splits(num_splits) {} Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -114,8 +115,6 @@ public: TFileScanRangeParams* get_params() override { LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map"; } - - int64_t get_split_time() override { return _get_split_timer; } }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 937258129a..dcfb404ae5 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -165,7 +165,6 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT); - _get_split_timer = ADD_TIMER(_parent->_scanner_profile, "GetSplitTime"); } else { _get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime"); _open_reader_timer = @@ -184,7 +183,6 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT); - _get_split_timer = ADD_TIMER(_local_state->scanner_profile(), "GetSplitTime"); } _file_cache_statistics.reset(new io::FileCacheStatistics()); @@ -1197,7 +1195,6 @@ Status VFileScanner::close(RuntimeState* state) { if (_cur_reader) { RETURN_IF_ERROR(_cur_reader->close()); } - COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time()); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index a54e06354c..43c1a8b13d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -184,7 +184,6 @@ private: RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; - RuntimeProfile::Counter* _get_split_timer = nullptr; const std::unordered_map* _col_name_to_slot_id = nullptr; // single slot filter conjuncts diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index ff61c5ee13..463cf629b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -337,10 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode { locationType = getLocationType(fileSplit.getPath().toString()); } totalFileSize = fileSplit.getLength() * inputSplitsNum; + long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends(); for (Backend backend : backendPolicy.getBackends()) { - SplitSource splitSource = new SplitSource(backend, splitAssignment); + SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime); splitSources.add(splitSource); Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index dce135292e..8515e686f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -44,17 +44,18 @@ import java.util.concurrent.atomic.AtomicLong; public class SplitSource { private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); private static final long WAIT_TIME_OUT = 100; // 100ms - private static final long MAX_WAIT_TIME_OUT = 500; // 500ms private final long uniqueId; private final Backend backend; private final SplitAssignment splitAssignment; private final AtomicBoolean isLastBatch; + private final long maxWaitTime; - public SplitSource(Backend backend, SplitAssignment splitAssignment) { + public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) { this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); this.backend = backend; this.splitAssignment = splitAssignment; + this.maxWaitTime = maxWaitTime; this.isLastBatch = new AtomicBoolean(false); splitAssignment.registerSource(uniqueId); } @@ -71,7 +72,7 @@ public class SplitSource { return Collections.emptyList(); } List scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); - long maxTimeOut = 0; + long startTime = System.currentTimeMillis(); while (scanRanges.size() < maxBatchSize) { BlockingQueue> splits = splitAssignment.getAssignedSplits(backend); if (splits == null) { @@ -81,18 +82,19 @@ public class SplitSource { while (scanRanges.size() < maxBatchSize) { try { Collection splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); + if (splitCollection != null) { + scanRanges.addAll(splitCollection); + } + if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) { + return scanRanges; + } if (splitCollection == null) { - maxTimeOut += WAIT_TIME_OUT; break; } - scanRanges.addAll(splitCollection); } catch (InterruptedException e) { throw new UserException("Failed to get next batch of splits", e); } } - if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) { - break; - } } return scanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 052cc0c190..d6e75faf67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -419,6 +419,8 @@ public class SessionVariable implements Serializable, Writable { public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode"; + public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time_ms"; + /** * use insert stmt as the unified backend for all loads */ @@ -1471,6 +1473,13 @@ public class SessionVariable implements Serializable, Writable { needForward = true) public int numPartitionsInBatchMode = 1024; + @VariableMgr.VarAttr( + name = FETCH_SPLITS_MAX_WAIT_TIME, + description = {"batch方式中BE获取splits的最大等待时间", + "The max wait time of getting splits in batch mode."}, + needForward = true) + public long fetchSplitsMaxWaitTime = 4000; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2731,6 +2740,14 @@ public class SessionVariable implements Serializable, Writable { this.numPartitionsInBatchMode = numPartitionsInBatchMode; } + public long getFetchSplitsMaxWaitTime() { + return fetchSplitsMaxWaitTime; + } + + public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) { + this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; }