diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index ae8974fb42..b2a70b8bd7 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -583,12 +583,4 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return Status::OK(); } -Status HashJoinBuildSinkOperatorX::close(RuntimeState* state) { - if (!is_closed()) { - _shared_hash_table_context = nullptr; - _is_closed = true; - } - return JoinBuildSinkOperatorX::close(state); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 4bd1bb2007..faafd2932e 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -122,9 +122,22 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - Status close(RuntimeState* state) override; - bool can_write(RuntimeState* state) override { return true; } + bool can_write(RuntimeState* state) override { + if (state->get_sink_local_state(id()) + ->cast() + ._should_build_hash_table) { + return true; + } + return _shared_hash_table_context && _shared_hash_table_context->signaled; + } + + bool should_dry_run(RuntimeState* state) override { + auto tmp = _is_broadcast_join && !state->get_sink_local_state(id()) + ->cast() + ._should_build_hash_table; + return tmp; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index d0dddda09b..6658df039b 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -440,6 +440,8 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } + virtual bool should_dry_run(RuntimeState* state) { return false; } + protected: const int _id; std::string _name; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index f55c11982e..e401b99e8c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -111,6 +111,7 @@ Status PipelineXTask::_open() { } LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender}; RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); + _dry_run = _sink->should_dry_run(_state); RETURN_IF_ERROR(st); _opened = true; return Status::OK(); @@ -176,11 +177,14 @@ Status PipelineXTask::execute(bool* eos) { auto* block = _block.get(); // Pull block from operator chain - { + if (!_dry_run) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); RETURN_IF_ERROR(_root->get_next_after_projects(_state, block, _data_state)); + } else { + _data_state = SourceState::FINISHED; } + *eos = _data_state == SourceState::FINISHED; if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 0342d4b0fc..de865876be 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -67,7 +67,7 @@ public: Status close() override; bool source_can_read() override { - if (_pipeline->_always_can_read) { + if (_dry_run) { return true; } for (auto& op : _operators) { @@ -82,9 +82,7 @@ public: return _source->runtime_filters_are_ready_or_timeout(); } - bool sink_can_write() override { - return _sink->can_write(_state) || _pipeline->_always_can_write; - } + bool sink_can_write() override { return _sink->can_write(_state); } Status finalize() override; @@ -132,5 +130,6 @@ private: std::shared_ptr _sender; std::shared_ptr _recvr; + bool _dry_run = false; }; } // namespace doris::pipeline diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b56d92fc83..4c8d4edf7e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -694,6 +694,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine && params.query_options.enable_pipeline_x_engine; if (enable_pipeline_x) { + _setup_shared_hashtable_for_broadcast_join(params, query_ctx.get()); int64_t duration_ns = 0; std::shared_ptr context = std::make_shared( @@ -733,8 +734,6 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, !params.need_wait_execution_trigger) { query_ctx->set_ready_to_execute_only(); } - _setup_shared_hashtable_for_broadcast_join(params, params.local_params[i], - query_ctx.get()); } { std::lock_guard lock(_lock); @@ -1323,8 +1322,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag if (params.build_hash_table_for_broadcast_join) { query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( - params.params.fragment_instance_id, params.instances_sharing_hash_table, - node.node_id); + params.params.fragment_instance_id, node.node_id); } } } @@ -1350,8 +1348,34 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join( if (local_params.build_hash_table_for_broadcast_join) { query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( - local_params.fragment_instance_id, params.instances_sharing_hash_table, - node.node_id); + local_params.fragment_instance_id, node.node_id); + } + } +} + +void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, + QueryContext* query_ctx) { + if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || + !params.query_options.enable_share_hash_table_for_broadcast_join) { + return; + } + + if (!params.__isset.fragment || !params.fragment.__isset.plan || + params.fragment.plan.nodes.empty()) { + return; + } + for (auto& node : params.fragment.plan.nodes) { + if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || + !node.hash_join_node.__isset.is_broadcast_join || + !node.hash_join_node.is_broadcast_join) { + continue; + } + + for (auto& local_param : params.local_params) { + if (local_param.build_hash_table_for_broadcast_join) { + query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( + local_param.fragment_instance_id, node.node_id); + } } } } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index e31a5925da..367661dda4 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -167,6 +167,9 @@ private: const TPipelineInstanceParams& local_params, QueryContext* query_ctx); + void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, + QueryContext* query_ctx); + template Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr& query_ctx); diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 490a2cfdcd..869c3d7ec0 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -26,32 +26,24 @@ namespace doris { namespace vectorized { -void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, - const std::vector& consumers, - int node_id) { +void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int node_id) { // Only need to set builder and consumers with pipeline engine enabled. DCHECK(_pipeline_engine_enabled); std::lock_guard lock(_mutex); DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend()); _builder_fragment_ids.insert({node_id, builder}); - _ref_fragments[node_id].assign(consumers.cbegin(), consumers.cend()); } bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id) { std::lock_guard lock(_mutex); auto it = _builder_fragment_ids.find(my_node_id); - if (_pipeline_engine_enabled) { - if (it != _builder_fragment_ids.cend()) { - return it->second == fragment_instance_id; - } - return false; - } - - if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids.insert({my_node_id, fragment_instance_id}); - return true; + DCHECK(_pipeline_engine_enabled && it != _builder_fragment_ids.cend()); + if (it != _builder_fragment_ids.cend()) { + return it->second == fragment_instance_id; } + throw Exception(ErrorCode::INTERNAL_ERROR, + "Shared hash table for node {} has not been initialized!", my_node_id); return false; } diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 0f2517c46c..f76121e7c8 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -67,8 +67,7 @@ using SharedHashTableContextPtr = std::shared_ptr; class SharedHashTableController { public: /// set hash table builder's fragment instance id and consumers' fragment instance id - void set_builder_and_consumers(TUniqueId builder, const std::vector& consumers, - int node_id); + void set_builder_and_consumers(TUniqueId builder, int node_id); TUniqueId get_builder_fragment_instance_id(int my_node_id); SharedHashTableContextPtr get_context(int my_node_id); void signal(int my_node_id); @@ -81,7 +80,6 @@ private: bool _pipeline_engine_enabled = false; std::mutex _mutex; std::condition_variable _cv; - std::map> _ref_fragments; std::map _builder_fragment_ids; std::map _shared_contexts; };