[opt](scan) read scan ranges in the order of partitions (#33515) (#33657)

backport: #33515
This commit is contained in:
Ashin Gau
2024-04-15 16:20:12 +08:00
committed by yiguolei
parent d0394b7f89
commit 2cd4012541
4 changed files with 79 additions and 43 deletions

View File

@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of threads in thread pool.
_scan_ranges.clear();
auto range_iter = scan_ranges.begin();
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
_scan_ranges.push_back(*range_iter);
}
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
if (i == max_scanners) {
i = 0;
// scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
// In the insert statement, reading data in partition order can reduce the memory usage of BE
// and prevent the generation of smaller tables.
_scan_ranges.resize(max_scanners);
int num_ranges = scan_ranges.size() / max_scanners;
int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
int scan_index = 0;
int range_index = 0;
for (int i = 0; i < num_add_one; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
}
for (int i = num_add_one; i < max_scanners; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges - 1; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0 &&

View File

@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of threads in thread pool.
_scan_ranges.clear();
auto range_iter = scan_ranges.begin();
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
_scan_ranges.push_back(*range_iter);
}
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
if (i == max_scanners) {
i = 0;
// scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
// In the insert statement, reading data in partition order can reduce the memory usage of BE
// and prevent the generation of smaller tables.
_scan_ranges.resize(max_scanners);
int num_ranges = scan_ranges.size() / max_scanners;
int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
int scan_index = 0;
int range_index = 0;
for (int i = 0; i < num_add_one; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
}
for (int i = num_add_one; i < max_scanners; ++i) {
_scan_ranges[scan_index] = scan_ranges[range_index++];
auto& ranges =
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
for (int j = 0; j < num_ranges - 1; j++) {
auto& merged_ranges =
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0 &&

View File

@ -211,24 +211,18 @@ public class FederationBackendPolicy {
this.enableSplitsRedistribution = enableSplitsRedistribution;
}
/**
* Assign splits to each backend. Ensure that each backend receives a similar amount of data.
* In order to make sure backends utilize the os page cache as much as possible, and all backends read splits
* in the order of partitions(reading data in partition order can reduce the memory usage of backends),
* splits should be sorted by path.
* Fortunately, the process of obtaining splits ensures that the splits have been sorted according to the path.
* If the splits are unordered, it is strongly recommended to sort them before calling this function.
*/
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException {
// Sorting splits is to ensure that the same query utilizes the os page cache as much as possible.
splits.sort((split1, split2) -> {
int pathComparison = split1.getPathString().compareTo(split2.getPathString());
if (pathComparison != 0) {
return pathComparison;
}
int startComparison = Long.compare(split1.getStart(), split2.getStart());
if (startComparison != 0) {
return startComparison;
}
return Long.compare(split1.getLength(), split2.getLength());
});
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
List<Split> remainingSplits = null;
List<Split> remainingSplits;
List<Backend> backends = new ArrayList<>();
for (List<Backend> backendList : backendMap.values()) {
@ -242,8 +236,7 @@ public class FederationBackendPolicy {
// locality information
if (Config.split_assigner_optimized_local_scheduling) {
remainingSplits = new ArrayList<>(splits.size());
for (int i = 0; i < splits.size(); ++i) {
Split split = splits.get(i);
for (Split split : splits) {
if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) {
List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts());

View File

@ -288,6 +288,21 @@ public class FederationBackendPolicyTest {
}
public static void sortSplits(List<Split> splits) {
splits.sort((split1, split2) -> {
int pathComparison = split1.getPathString().compareTo(split2.getPathString());
if (pathComparison != 0) {
return pathComparison;
}
int startComparison = Long.compare(split1.getStart(), split2.getStart());
if (startComparison != 0) {
return startComparison;
}
return Long.compare(split1.getLength(), split2.getLength());
});
}
@Test
public void testGenerateRandomly() throws UserException {
SystemInfoService service = new SystemInfoService();
@ -367,7 +382,7 @@ public class FederationBackendPolicyTest {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
Collections.shuffle(totalSplits);
sortSplits(totalSplits);
Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);
@ -489,7 +504,7 @@ public class FederationBackendPolicyTest {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
Collections.shuffle(totalSplits);
sortSplits(totalSplits);
Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);