[PipelineX](improvement) Prepare tasks in parallel (#40270)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
Gabriel
2024-09-13 13:34:29 +08:00
committed by GitHub
parent 76495b6d5c
commit 3395cd5ce9
10 changed files with 97 additions and 40 deletions

View File

@ -514,7 +514,7 @@ DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "10240");
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.

View File

@ -1339,6 +1339,7 @@ void IRuntimeFilter::signal() {
}
void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) {
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}

View File

@ -156,7 +156,6 @@ public:
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }
void incr_created_tasks() { _num_tasks_created++; }
bool need_to_create_task() const { return _num_tasks > _num_tasks_created; }
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : operatorXs) {
@ -243,7 +242,7 @@ private:
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
int _num_tasks_created = 0;
std::atomic<int> _num_tasks_created = 0;
};
} // namespace doris::pipeline

View File

@ -89,7 +89,7 @@ public:
Status prepare(const doris::TPipelineFragmentParams& request, size_t idx);
virtual Status prepare(const doris::TPipelineFragmentParams& request) {
virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
return Status::InternalError("Pipeline fragment context do not implement prepare");
}
@ -168,7 +168,7 @@ protected:
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
int _total_tasks = 0;
std::atomic<int> _total_tasks = 0;
int32_t _next_operator_builder_id = 10000;

View File

@ -116,9 +116,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
auto st = _query_ctx->exec_status();
_tasks.clear();
if (!_task_runtime_states.empty()) {
for (auto& runtime_state : _task_runtime_states) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state) {
_call_back(runtime_state.get(), &st);
runtime_state.reset();
}
}
}
} else {
_call_back(nullptr, &st);
@ -182,7 +186,8 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
}
}
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
@ -210,7 +215,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id,
request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
@ -284,7 +288,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}
// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request));
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
_init_next_report_time();
@ -511,11 +515,17 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
return Status::OK();
}
Status PipelineXFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
Status PipelineXFragmentContext::_build_pipeline_x_tasks(
const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
_total_tasks = 0;
int target_size = request.local_params.size();
_tasks.resize(target_size);
_fragment_instance_ids.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
}
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
DCHECK(pipeline_id_to_profile.empty());
pipeline_id_to_profile.resize(_pipelines.size());
@ -528,10 +538,10 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}
for (size_t i = 0; i < target_size; i++) {
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids.push_back(fragment_instance_id);
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
@ -588,7 +598,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
_runtime_filter_states.push_back(std::move(filterparams));
_runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
@ -608,32 +618,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
return le_state_map;
};
auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
DCHECK(_task_runtime_states[task_id]);
return _task_runtime_states[task_id].get();
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
if (pipeline->num_tasks() > 1 || i == 0) {
auto cur_task_id = _total_tasks++;
DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
<< print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
<< pipeline->debug_string();
// build task runtime state
_task_runtime_states.push_back(RuntimeState::create_unique(
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineXTask>(
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this,
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
i);
auto task = std::make_unique<PipelineXTask>(pipeline, cur_task_id,
task_runtime_state.get(), ctx,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
}
/**
* Build DAG for pipeline tasks.
* For example, we have
@ -693,6 +701,40 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
Status prepare_status[target_size];
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);
for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
}
} else {
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(pre_and_submit(i, this));
}
}
_pipeline_parent_map.clear();
_dag.clear();
@ -1512,8 +1554,12 @@ Status PipelineXFragmentContext::send_report(bool done) {
std::vector<RuntimeState*> runtime_states;
for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
}
}
return _report_status_cb(
{true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(),

View File

@ -94,7 +94,7 @@ public:
}
// Prepare global information including global states and the unique operator tree shared by all pipeline tasks.
Status prepare(const doris::TPipelineFragmentParams& request) override;
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) override;
Status submit() override;
@ -118,7 +118,8 @@ public:
private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
PipelinePtr cur_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
@ -230,7 +231,7 @@ private:
std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;
std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

View File

@ -962,7 +962,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
auto prepare_st = context->prepare(params, _thread_pool.get());
if (!prepare_st.ok()) {
context->close_if_prepare_failed(prepare_st);
query_ctx->set_execution_dependency_ready();

View File

@ -126,9 +126,6 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
_is_local &= state->query_options().enable_local_exchange;
}
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
return Status::OK();
}
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
@ -149,6 +146,11 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
if (_is_local) {
WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr),
"");
}
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request

View File

@ -279,6 +279,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold";
public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold";
public static final String ENABLE_PROJECTION = "enable_projection";
public static final String CHECK_OVERFLOW_FOR_DECIMAL = "check_overflow_for_decimal";
@ -1004,7 +1006,7 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private long parallelScanMinRowsPerScanner = 16384; // 16K
private long parallelScanMinRowsPerScanner = 2097152; // 16K
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@ -1044,6 +1046,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;
@VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD)
public int parallelPrepareThreshold = 32;
@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;
@ -3426,6 +3431,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);
// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);

View File

@ -329,6 +329,8 @@ struct TQueryOptions {
130: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true;
131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;
// only in 2.1
999: optional i32 parallel_prepare_threshold = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}