From a3fd13fee671d9667e85d8d1be454f2bdb2a2b94 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 20 Aug 2024 21:59:55 +0800 Subject: [PATCH] [fix](catalog) set timeout for split fetch (#39346) (#39624) bp #39346 --- be/src/vec/exec/scan/split_source_connector.cpp | 5 +++++ .../java/org/apache/doris/datasource/SplitSource.java | 9 +++++++-- .../org/apache/doris/service/FrontendServiceImpl.java | 6 +++++- gensrc/thrift/FrontendService.thrift | 1 + 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index 478af522e7..cefe5b7021 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -47,6 +47,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang if (_scan_index == _scan_ranges.size() && !_last_batch) { SCOPED_TIMER(_get_split_timer); Status coord_status; + // No need to set timeout because on FE side, there is a max fetch time FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(), _state->get_query_ctx()->coord_addr, &coord_status); RETURN_IF_ERROR(coord_status); @@ -56,6 +57,10 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang TFetchSplitBatchResult result; try { coord->fetchSplitBatch(result, request); + if (result.__isset.status && result.status.status_code != TStatusCode::OK) { + return Status::IOError("Failed to get batch of split source: {}", + result.status.error_msgs[0]); + } } catch (std::exception& e) { return Status::IOError("Failed to get batch of split source: {}", e.what()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index 8515e686f3..e24af76878 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -73,7 +73,7 @@ public class SplitSource { } List scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); long startTime = System.currentTimeMillis(); - while (scanRanges.size() < maxBatchSize) { + while (scanRanges.size() < maxBatchSize && System.currentTimeMillis() - startTime < maxWaitTime) { BlockingQueue> splits = splitAssignment.getAssignedSplits(backend); if (splits == null) { isLastBatch.set(true); @@ -92,10 +92,15 @@ public class SplitSource { break; } } catch (InterruptedException e) { - throw new UserException("Failed to get next batch of splits", e); + throw new UserException(e.getMessage(), e); } } } + + if (scanRanges.isEmpty() && !isLastBatch.get()) { + // This is timeout + throw new UserException("Timeout. Max wait time(ms): " + maxWaitTime); + } return scanRanges; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b04b5aa389..7f727c51ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -976,10 +976,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { try { List locations = splitSource.getNextBatch(request.getMaxNumSplits()); result.setSplits(locations); + result.status = new TStatus(TStatusCode.OK); return result; } catch (Exception e) { - throw new TException("Failed to get split source " + request.getSplitSourceId(), e); + LOG.warn("failed to fetch split batch with source id {}", request.getSplitSourceId(), e); + result.status = new TStatus(TStatusCode.INTERNAL_ERROR); + result.status.addToErrorMsgs(e.getMessage()); } + return result; } @Override diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7d5b94bd9f..39edf990a6 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1478,6 +1478,7 @@ struct TFetchSplitBatchRequest { struct TFetchSplitBatchResult { 1: optional list splits + 2: optional Status.TStatus status } struct TFetchRunningQueriesResult {