From 320ddf4987c9fdb33f9ee7aced3881009681bd51 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 10 Dec 2023 20:18:41 +0800 Subject: [PATCH] [pipelineX](improvement) Support multiple instances execution on single tablet (#28178) --- be/src/pipeline/exec/scan_operator.cpp | 1 + be/src/pipeline/pipeline.h | 15 ++- be/src/pipeline/pipeline_fragment_context.cpp | 4 +- be/src/pipeline/pipeline_fragment_context.h | 1 + .../local_exchange/local_exchanger.h | 10 +- .../pipeline_x_fragment_context.cpp | 108 ++++++++++++------ .../pipeline_x/pipeline_x_fragment_context.h | 3 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 1 + .../java/org/apache/doris/qe/Coordinator.java | 16 +-- gensrc/thrift/PaloInternalService.thrift | 1 + 10 files changed, 106 insertions(+), 54 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 96bda3cd0b..e1e17e0ccf 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1431,6 +1431,7 @@ Status ScanLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time()); SCOPED_TIMER(_close_timer); SCOPED_TIMER(exec_time_counter()); diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 68213fe9c2..5d623f899a 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -50,8 +50,9 @@ class Pipeline : public std::enable_shared_from_this { public: Pipeline() = delete; - explicit Pipeline(PipelineId pipeline_id, std::weak_ptr context) - : _pipeline_id(pipeline_id), _context(context) { + explicit Pipeline(PipelineId pipeline_id, int num_tasks, + std::weak_ptr context) + : _pipeline_id(pipeline_id), _context(context), _num_tasks(num_tasks) { _init_profile(); } @@ -138,6 +139,11 @@ public: void set_children(std::shared_ptr child) { _children.push_back(child); } void set_children(std::vector> children) { _children = children; } + void incr_created_tasks() { _num_tasks_created++; } + bool need_to_create_task() const { return _num_tasks > _num_tasks_created; } + void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; } + int num_tasks() const { return _num_tasks; } + private: void _init_profile(); @@ -199,6 +205,11 @@ private: // then set `_need_to_local_shuffle` to false which means we should use local shuffle in this fragment // because data already be partitioned by storage/shuffling. bool _need_to_local_shuffle = true; + + // How many tasks should be created ? + int _num_tasks = 1; + // How many tasks are already created? + int _num_tasks_created = 0; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b800e167bc..8bf884692d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -189,7 +189,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, PipelinePtr PipelineFragmentContext::add_pipeline() { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; - auto pipeline = std::make_shared(id, weak_from_this()); + auto pipeline = std::make_shared(id, _num_instances, weak_from_this()); _pipelines.emplace_back(pipeline); return pipeline; } @@ -197,7 +197,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline() { PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; - auto pipeline = std::make_shared(id, weak_from_this()); + auto pipeline = std::make_shared(id, _num_instances, weak_from_this()); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); } else { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index c8883248d4..a705230d2f 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -225,6 +225,7 @@ protected: report_status_callback _report_status_cb; DescriptorTbl* _desc_tbl = nullptr; + int _num_instances = 1; private: static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 3993289d5d..8c28469b50 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -27,8 +27,6 @@ class LocalExchangeSinkLocalState; class Exchanger { public: - Exchanger(int num_partitions) - : _running_sink_operators(num_partitions), _num_partitions(num_partitions) {} Exchanger(int running_sink_operators, int num_partitions) : _running_sink_operators(running_sink_operators), _num_partitions(num_partitions) {} virtual ~Exchanger() = default; @@ -56,9 +54,6 @@ class ShuffleExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); - ShuffleExchanger(int num_partitions) : Exchanger(num_partitions) { - _data_queue.resize(num_partitions); - } ShuffleExchanger(int running_sink_operators, int num_partitions) : Exchanger(running_sink_operators, num_partitions) { _data_queue.resize(num_partitions); @@ -90,8 +85,9 @@ class BucketShuffleExchanger : public ShuffleExchanger { class PassthroughExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); - PassthroughExchanger(int num_instances) : Exchanger(num_instances) { - _data_queue.resize(num_instances); + PassthroughExchanger(int running_sink_operators, int num_partitions) + : Exchanger(running_sink_operators, num_partitions) { + _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 294b9fc109..6a3f38d2c2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -505,11 +505,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( return le_state_map; }; for (auto& pipeline : _pipelines) { - auto task = std::make_unique( - pipeline, _total_tasks++, _runtime_states[i].get(), this, - _runtime_states[i]->runtime_profile(), get_local_exchange_state(pipeline), i); - pipeline_id_to_task.insert({pipeline->id(), task.get()}); - _tasks[i].emplace_back(std::move(task)); + if (pipeline->need_to_create_task()) { + auto task = std::make_unique(pipeline, _total_tasks++, + _runtime_states[i].get(), this, + _runtime_states[i]->runtime_profile(), + get_local_exchange_state(pipeline), i); + pipeline_id_to_task.insert({pipeline->id(), task.get()}); + _tasks[i].emplace_back(std::move(task)); + } } /** @@ -554,18 +557,22 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( }; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; - DCHECK(task != nullptr); + if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) { + auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; + DCHECK(task != nullptr); - // if this task has upstream dependency, then record them. - if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) { - auto& deps = _dag[_pipelines[pip_idx]->id()]; - for (auto& dep : deps) { - task->add_upstream_dependency( - pipeline_id_to_task[dep]->get_downstream_dependency()); + // if this task has upstream dependency, then record them. + if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) { + auto& deps = _dag[_pipelines[pip_idx]->id()]; + for (auto& dep : deps) { + if (pipeline_id_to_task.contains(dep)) { + task->add_upstream_dependency( + pipeline_id_to_task[dep]->get_downstream_dependency()); + } + } } + RETURN_IF_ERROR(prepare_and_set_parent_profile(task)); } - RETURN_IF_ERROR(prepare_and_set_parent_profile(task)); } { @@ -653,6 +660,34 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, return Status::OK(); } +void PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType exchange_type, + PipelinePtr pipe_with_source, + PipelinePtr pipe_with_sink) { + pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks()); + pipe_with_source->set_num_tasks(_num_instances); + switch (exchange_type) { + case ExchangeType::HASH_SHUFFLE: + // If HASH_SHUFFLE local exchanger is planned, data will be always HASH distribution so we + // do not need to plan another shuffle local exchange in the rest of current pipeline. + pipe_with_sink->set_need_to_local_shuffle(false); + pipe_with_source->set_need_to_local_shuffle(false); + break; + case ExchangeType::BUCKET_HASH_SHUFFLE: + // Same as ExchangeType::HASH_SHUFFLE. + pipe_with_sink->set_need_to_local_shuffle(false); + pipe_with_source->set_need_to_local_shuffle(false); + break; + case ExchangeType::PASSTHROUGH: + // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make + // sure remaining operators should use local shuffle to make data distribution right. + pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); + pipe_with_source->set_need_to_local_shuffle(true); + break; + default: + __builtin_unreachable(); + } +} + Status PipelineXFragmentContext::_add_local_exchange( int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, const std::vector& texprs, ExchangeType exchange_type, bool* do_local_exchange, @@ -674,59 +709,58 @@ Status PipelineXFragmentContext::_add_local_exchange( auto local_exchange_id = next_operator_id(); // 1. Create a new pipeline with local exchange sink. auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); - DataSinkOperatorXPtr sink; sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id, _num_instances, texprs, bucket_seq_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); + RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets)); + // 2. Inherit properties from current pipeline. + _inherit_pipeline_properties(exchange_type, cur_pipe, new_pip); + + // 3. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); shared_state->source_dependencies.resize(_num_instances, nullptr); switch (exchange_type) { case ExchangeType::HASH_SHUFFLE: - shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances); - // If HASH_SHUFFLE local exchanger is planned, data will be always HASH distribution so we - // do not need to plan another shuffle local exchange in the rest of current pipeline. - new_pip->set_need_to_local_shuffle(false); - cur_pipe->set_need_to_local_shuffle(false); + shared_state->exchanger = + ShuffleExchanger::create_unique(new_pip->num_tasks(), _num_instances); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = - BucketShuffleExchanger::create_unique(_num_instances, num_buckets); - // Same as ExchangeType::HASH_SHUFFLE. - new_pip->set_need_to_local_shuffle(false); - cur_pipe->set_need_to_local_shuffle(false); + BucketShuffleExchanger::create_unique(new_pip->num_tasks(), num_buckets); break; case ExchangeType::PASSTHROUGH: - // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make - // sure remaining operators should use local shuffle to make data distribution right. - shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances); - new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle()); - cur_pipe->set_need_to_local_shuffle(true); + shared_state->exchanger = + PassthroughExchanger::create_unique(new_pip->num_tasks(), _num_instances); break; default: return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)exchange_type)); } - RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets)); _op_id_to_le_state.insert({local_exchange_id, shared_state}); - // 2. Initialize operators list. + // 4. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to + // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. + + // 4.1 Initialize new pipeline's operator list. std::copy(operator_xs.begin(), operator_xs.begin() + idx, std::inserter(new_pip->operator_xs(), new_pip->operator_xs().end())); - // 3. Erase operators in new pipeline. + // 4.2 Erase unused operators in previous pipeline. + operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); + + // 5. Initialize LocalExchangeSource and insert it into this pipeline. OperatorXPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); RETURN_IF_ERROR(source_op->init(exchange_type)); - operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); if (operator_xs.size() > 0) { RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); } - operator_xs.insert(operator_xs.begin(), source_op); RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); + // 6. Set children for two pipelines separately. std::vector> new_children; std::vector edges_with_source; for (auto child : cur_pipe->children()) { @@ -745,6 +779,7 @@ Status PipelineXFragmentContext::_add_local_exchange( new_children.push_back(new_pip); edges_with_source.push_back(new_pip->id()); + // 7. Set DAG for new pipelines. if (!new_pip->children().empty()) { std::vector edges_with_sink; for (auto child : new_pip->children()) { @@ -773,6 +808,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::OLAP_SCAN_NODE: { op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + const bool shared_scan = + find_with_default(request.per_node_shared_scans, op->node_id(), false); + if (shared_scan) { + cur_pipe->set_num_tasks(1); + } break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index c8b042bd39..3719445bab 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -129,6 +129,8 @@ private: PipelinePtr cur_pipe, const std::vector& texprs, ExchangeType exchange_type, bool* do_local_exchange, int num_buckets, const std::map& bucket_seq_to_instance_idx); + void _inherit_pipeline_properties(ExchangeType exchange_type, PipelinePtr pipe_with_source, + PipelinePtr pipe_with_sink); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -217,7 +219,6 @@ private: int _operator_id = 0; int _sink_operator_id = 0; - int _num_instances = 0; std::map> _op_id_to_le_state; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 76e2c21d38..7657d82b7f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -62,6 +62,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta for (auto& op : _operators) { _source_dependency.insert({op->operator_id(), op->get_dependency(state->get_query_ctx())}); } + pipeline->incr_created_tasks(); } Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cd498ccf9b..5188412bd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1995,8 +1995,7 @@ public class Coordinator implements CoordInterface { // 4. Disable shared scan optimization by session variable if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan()) || (node.isPresent() && node.get() instanceof FileScanNode) - || (node.isPresent() && node.get().shouldDisableSharedScan(context)) - || enablePipelineXEngine) { + || (node.isPresent() && node.get().shouldDisableSharedScan(context))) { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -3599,6 +3598,12 @@ public class Coordinator implements CoordInterface { Map instanceIdx = new HashMap(); for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); + Map> scanRanges = instanceExecParam.perNodeScanRanges; + Map perNodeSharedScans = instanceExecParam.perNodeSharedScans; + if (scanRanges == null) { + scanRanges = Maps.newHashMap(); + perNodeSharedScans = Maps.newHashMap(); + } if (!res.containsKey(instanceExecParam.host)) { TPipelineFragmentParams params = new TPipelineFragmentParams(); @@ -3624,6 +3629,7 @@ public class Coordinator implements CoordInterface { params.setFileScanParams(fileScanRangeParamsMap); params.setNumBuckets(fragment.getBucketNum()); + params.setPerNodeSharedScans(perNodeSharedScans); res.put(instanceExecParam.host, params); res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap()); instanceIdx.put(instanceExecParam.host, 0); @@ -3641,12 +3647,6 @@ public class Coordinator implements CoordInterface { localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); localParams.setFragmentInstanceId(instanceExecParam.instanceId); - Map> scanRanges = instanceExecParam.perNodeScanRanges; - Map perNodeSharedScans = instanceExecParam.perNodeSharedScans; - if (scanRanges == null) { - scanRanges = Maps.newHashMap(); - perNodeSharedScans = Maps.newHashMap(); - } localParams.setPerNodeScanRanges(scanRanges); localParams.setPerNodeSharedScans(perNodeSharedScans); localParams.setSenderId(i); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index fb702f6113..b6b96eee95 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -691,6 +691,7 @@ struct TPipelineFragmentParams { 33: optional i32 num_local_sink 34: optional i32 num_buckets 35: optional map bucket_seq_to_instance_idx + 36: optional map per_node_shared_scans } struct TPipelineFragmentParamsList {