[feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
FileScanNode in be will launch as many threads as the number of splits. The thrift interface of FileScanNode is excessive redundant.
This commit is contained in:
@ -17,12 +17,14 @@
|
||||
|
||||
#include "vec/exec/file_scan_node.h"
|
||||
|
||||
#include "common/config.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "runtime/tuple.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
#include "util/priority_thread_pool.hpp"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "util/thread.h"
|
||||
#include "util/types.h"
|
||||
@ -100,9 +102,37 @@ Status FileScanNode::start_scanners() {
|
||||
}
|
||||
|
||||
_scanners_status.resize(_scan_ranges.size());
|
||||
for (int i = 0; i < _scan_ranges.size(); i++) {
|
||||
_scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
|
||||
std::ref(_scanners_status[i]));
|
||||
ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
|
||||
PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
|
||||
for (int i = 0; i < _scan_ranges.size(); ++i) {
|
||||
Status submit_status = Status::OK();
|
||||
if (thread_token != nullptr) {
|
||||
submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
|
||||
i, _scan_ranges.size(),
|
||||
std::ref(_scanners_status[i])));
|
||||
} else {
|
||||
PriorityThreadPool::WorkFunction task =
|
||||
std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
|
||||
std::ref(_scanners_status[i]));
|
||||
if (!thread_pool->offer(task)) {
|
||||
submit_status = Status::Cancelled("Failed to submit scan task");
|
||||
}
|
||||
}
|
||||
if (!submit_status.ok()) {
|
||||
LOG(WARNING) << "Failed to assign file scanner task to thread pool! "
|
||||
<< submit_status.get_error_msg();
|
||||
_scanners_status[i].set_value(submit_status);
|
||||
for (int j = i + 1; j < _scan_ranges.size(); ++j) {
|
||||
_scanners_status[j].set_value(Status::Cancelled("Cancelled"));
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_batch_queue_lock);
|
||||
update_status(submit_status);
|
||||
_num_running_scanners -= _scan_ranges.size() - i;
|
||||
}
|
||||
_queue_writer_cond.notify_all();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -205,8 +235,9 @@ Status FileScanNode::close(RuntimeState* state) {
|
||||
_scan_finished.store(true);
|
||||
_queue_writer_cond.notify_all();
|
||||
_queue_reader_cond.notify_all();
|
||||
for (int i = 0; i < _scanner_threads.size(); ++i) {
|
||||
_scanner_threads[i].join();
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_batch_queue_lock);
|
||||
_queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });
|
||||
}
|
||||
for (int i = 0; i < _scanners_status.size(); i++) {
|
||||
std::future<Status> f = _scanners_status[i].get_future();
|
||||
@ -308,7 +339,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status
|
||||
std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
|
||||
ScannerCounter* counter) {
|
||||
FileScanner* scan = nullptr;
|
||||
switch (scan_range.ranges[0].format_type) {
|
||||
switch (scan_range.params.format_type) {
|
||||
case TFileFormatType::FORMAT_PARQUET:
|
||||
scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
|
||||
scan_range.ranges, _pre_filter_texprs, counter);
|
||||
@ -329,7 +360,27 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
|
||||
|
||||
// This function is called after plan node has been prepared.
|
||||
Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
||||
_scan_ranges = scan_ranges;
|
||||
int max_scanners = config::doris_scanner_thread_pool_thread_num;
|
||||
if (scan_ranges.size() <= max_scanners) {
|
||||
_scan_ranges = scan_ranges;
|
||||
} else {
|
||||
// There is no need for the number of scanners to exceed the number of threads in thread pool.
|
||||
_scan_ranges.clear();
|
||||
auto range_iter = scan_ranges.begin();
|
||||
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
|
||||
_scan_ranges.push_back(*range_iter);
|
||||
}
|
||||
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
|
||||
if (i == max_scanners) {
|
||||
i = 0;
|
||||
}
|
||||
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
|
||||
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
|
||||
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
|
||||
}
|
||||
_scan_ranges.shrink_to_fit();
|
||||
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user