@ -47,6 +47,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang
|
||||
if (_scan_index == _scan_ranges.size() && !_last_batch) {
|
||||
SCOPED_TIMER(_get_split_timer);
|
||||
Status coord_status;
|
||||
// No need to set timeout because on FE side, there is a max fetch time
|
||||
FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(),
|
||||
_state->get_query_ctx()->coord_addr, &coord_status);
|
||||
RETURN_IF_ERROR(coord_status);
|
||||
@ -56,6 +57,10 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang
|
||||
TFetchSplitBatchResult result;
|
||||
try {
|
||||
coord->fetchSplitBatch(result, request);
|
||||
if (result.__isset.status && result.status.status_code != TStatusCode::OK) {
|
||||
return Status::IOError<false>("Failed to get batch of split source: {}",
|
||||
result.status.error_msgs[0]);
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return Status::IOError<false>("Failed to get batch of split source: {}", e.what());
|
||||
}
|
||||
|
||||
@ -73,7 +73,7 @@ public class SplitSource {
|
||||
}
|
||||
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (scanRanges.size() < maxBatchSize) {
|
||||
while (scanRanges.size() < maxBatchSize && System.currentTimeMillis() - startTime < maxWaitTime) {
|
||||
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
|
||||
if (splits == null) {
|
||||
isLastBatch.set(true);
|
||||
@ -92,10 +92,15 @@ public class SplitSource {
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new UserException("Failed to get next batch of splits", e);
|
||||
throw new UserException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (scanRanges.isEmpty() && !isLastBatch.get()) {
|
||||
// This is timeout
|
||||
throw new UserException("Timeout. Max wait time(ms): " + maxWaitTime);
|
||||
}
|
||||
return scanRanges;
|
||||
}
|
||||
}
|
||||
|
||||
@ -976,10 +976,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
try {
|
||||
List<TScanRangeLocations> locations = splitSource.getNextBatch(request.getMaxNumSplits());
|
||||
result.setSplits(locations);
|
||||
result.status = new TStatus(TStatusCode.OK);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
throw new TException("Failed to get split source " + request.getSplitSourceId(), e);
|
||||
LOG.warn("failed to fetch split batch with source id {}", request.getSplitSourceId(), e);
|
||||
result.status = new TStatus(TStatusCode.INTERNAL_ERROR);
|
||||
result.status.addToErrorMsgs(e.getMessage());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -1478,6 +1478,7 @@ struct TFetchSplitBatchRequest {
|
||||
|
||||
struct TFetchSplitBatchResult {
|
||||
1: optional list<Planner.TScanRangeLocations> splits
|
||||
2: optional Status.TStatus status
|
||||
}
|
||||
|
||||
struct TFetchRunningQueriesResult {
|
||||
|
||||
Reference in New Issue
Block a user