@ -71,8 +71,9 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
|
||||
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
|
||||
if (scan_range.__isset.split_source) {
|
||||
auto split_source = scan_range.split_source;
|
||||
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime");
|
||||
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
|
||||
state, split_source.split_source_id, split_source.num_splits);
|
||||
state, get_split_timer, split_source.split_source_id, split_source.num_splits);
|
||||
}
|
||||
}
|
||||
if (_split_source == nullptr) {
|
||||
|
||||
@ -74,8 +74,9 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
|
||||
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
|
||||
if (scan_range.__isset.split_source) {
|
||||
auto split_source = scan_range.split_source;
|
||||
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime");
|
||||
_split_source = std::make_shared<RemoteSplitSourceConnector>(
|
||||
state, split_source.split_source_id, split_source.num_splits);
|
||||
state, get_split_timer, split_source.split_source_id, split_source.num_splits);
|
||||
}
|
||||
}
|
||||
if (_split_source == nullptr) {
|
||||
|
||||
@ -45,7 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang
|
||||
std::lock_guard<std::mutex> l(_range_lock);
|
||||
*has_next = false;
|
||||
if (_scan_index == _scan_ranges.size() && !_last_batch) {
|
||||
SCOPED_RAW_TIMER(&_get_split_timer);
|
||||
SCOPED_TIMER(_get_split_timer);
|
||||
Status coord_status;
|
||||
FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(),
|
||||
_state->get_query_ctx()->coord_addr, &coord_status);
|
||||
|
||||
@ -43,8 +43,6 @@ public:
|
||||
virtual int num_scan_ranges() = 0;
|
||||
|
||||
virtual TFileScanRangeParams* get_params() = 0;
|
||||
|
||||
virtual int64_t get_split_time() { return 0; }
|
||||
};
|
||||
|
||||
/**
|
||||
@ -89,6 +87,7 @@ class RemoteSplitSourceConnector : public SplitSourceConnector {
|
||||
private:
|
||||
std::mutex _range_lock;
|
||||
RuntimeState* _state;
|
||||
RuntimeProfile::Counter* _get_split_timer;
|
||||
int64 _split_source_id;
|
||||
int _num_splits;
|
||||
|
||||
@ -97,11 +96,13 @@ private:
|
||||
int _scan_index = 0;
|
||||
int _range_index = 0;
|
||||
|
||||
int64_t _get_split_timer = 0;
|
||||
|
||||
public:
|
||||
RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits)
|
||||
: _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {}
|
||||
RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer,
|
||||
int64 split_source_id, int num_splits)
|
||||
: _state(state),
|
||||
_get_split_timer(get_split_timer),
|
||||
_split_source_id(split_source_id),
|
||||
_num_splits(num_splits) {}
|
||||
|
||||
Status get_next(bool* has_next, TFileRangeDesc* range) override;
|
||||
|
||||
@ -114,8 +115,6 @@ public:
|
||||
TFileScanRangeParams* get_params() override {
|
||||
LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map";
|
||||
}
|
||||
|
||||
int64_t get_split_time() override { return _get_split_timer; }
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -165,7 +165,6 @@ Status VFileScanner::prepare(
|
||||
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
|
||||
_has_fully_rf_file_counter =
|
||||
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
|
||||
_get_split_timer = ADD_TIMER(_parent->_scanner_profile, "GetSplitTime");
|
||||
} else {
|
||||
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
|
||||
_open_reader_timer =
|
||||
@ -184,7 +183,6 @@ Status VFileScanner::prepare(
|
||||
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
|
||||
_has_fully_rf_file_counter =
|
||||
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
|
||||
_get_split_timer = ADD_TIMER(_local_state->scanner_profile(), "GetSplitTime");
|
||||
}
|
||||
|
||||
_file_cache_statistics.reset(new io::FileCacheStatistics());
|
||||
@ -1197,7 +1195,6 @@ Status VFileScanner::close(RuntimeState* state) {
|
||||
if (_cur_reader) {
|
||||
RETURN_IF_ERROR(_cur_reader->close());
|
||||
}
|
||||
COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time());
|
||||
|
||||
RETURN_IF_ERROR(VScanner::close(state));
|
||||
return Status::OK();
|
||||
|
||||
@ -184,7 +184,6 @@ private:
|
||||
RuntimeProfile::Counter* _empty_file_counter = nullptr;
|
||||
RuntimeProfile::Counter* _file_counter = nullptr;
|
||||
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
|
||||
RuntimeProfile::Counter* _get_split_timer = nullptr;
|
||||
|
||||
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
|
||||
// single slot filter conjuncts
|
||||
|
||||
@ -337,10 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
locationType = getLocationType(fileSplit.getPath().toString());
|
||||
}
|
||||
totalFileSize = fileSplit.getLength() * inputSplitsNum;
|
||||
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
|
||||
// Not accurate, only used to estimate concurrency.
|
||||
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
|
||||
for (Backend backend : backendPolicy.getBackends()) {
|
||||
SplitSource splitSource = new SplitSource(backend, splitAssignment);
|
||||
SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime);
|
||||
splitSources.add(splitSource);
|
||||
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
|
||||
TScanRangeLocations curLocations = newLocations();
|
||||
|
||||
@ -44,17 +44,18 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
public class SplitSource {
|
||||
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
|
||||
private static final long WAIT_TIME_OUT = 100; // 100ms
|
||||
private static final long MAX_WAIT_TIME_OUT = 500; // 500ms
|
||||
|
||||
private final long uniqueId;
|
||||
private final Backend backend;
|
||||
private final SplitAssignment splitAssignment;
|
||||
private final AtomicBoolean isLastBatch;
|
||||
private final long maxWaitTime;
|
||||
|
||||
public SplitSource(Backend backend, SplitAssignment splitAssignment) {
|
||||
public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) {
|
||||
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
|
||||
this.backend = backend;
|
||||
this.splitAssignment = splitAssignment;
|
||||
this.maxWaitTime = maxWaitTime;
|
||||
this.isLastBatch = new AtomicBoolean(false);
|
||||
splitAssignment.registerSource(uniqueId);
|
||||
}
|
||||
@ -71,7 +72,7 @@ public class SplitSource {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
|
||||
long maxTimeOut = 0;
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (scanRanges.size() < maxBatchSize) {
|
||||
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
|
||||
if (splits == null) {
|
||||
@ -81,18 +82,19 @@ public class SplitSource {
|
||||
while (scanRanges.size() < maxBatchSize) {
|
||||
try {
|
||||
Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
|
||||
if (splitCollection != null) {
|
||||
scanRanges.addAll(splitCollection);
|
||||
}
|
||||
if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) {
|
||||
return scanRanges;
|
||||
}
|
||||
if (splitCollection == null) {
|
||||
maxTimeOut += WAIT_TIME_OUT;
|
||||
break;
|
||||
}
|
||||
scanRanges.addAll(splitCollection);
|
||||
} catch (InterruptedException e) {
|
||||
throw new UserException("Failed to get next batch of splits", e);
|
||||
}
|
||||
}
|
||||
if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return scanRanges;
|
||||
}
|
||||
|
||||
@ -419,6 +419,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode";
|
||||
|
||||
public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time_ms";
|
||||
|
||||
/**
|
||||
* use insert stmt as the unified backend for all loads
|
||||
*/
|
||||
@ -1471,6 +1473,13 @@ public class SessionVariable implements Serializable, Writable {
|
||||
needForward = true)
|
||||
public int numPartitionsInBatchMode = 1024;
|
||||
|
||||
@VariableMgr.VarAttr(
|
||||
name = FETCH_SPLITS_MAX_WAIT_TIME,
|
||||
description = {"batch方式中BE获取splits的最大等待时间",
|
||||
"The max wait time of getting splits in batch mode."},
|
||||
needForward = true)
|
||||
public long fetchSplitsMaxWaitTime = 4000;
|
||||
|
||||
@VariableMgr.VarAttr(
|
||||
name = ENABLE_PARQUET_LAZY_MAT,
|
||||
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
|
||||
@ -2731,6 +2740,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.numPartitionsInBatchMode = numPartitionsInBatchMode;
|
||||
}
|
||||
|
||||
public long getFetchSplitsMaxWaitTime() {
|
||||
return fetchSplitsMaxWaitTime;
|
||||
}
|
||||
|
||||
public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) {
|
||||
this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime;
|
||||
}
|
||||
|
||||
public boolean isEnableParquetLazyMat() {
|
||||
return enableParquetLazyMat;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user