From f7caae08d536b783355ec2087629bc8003bf578d Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 30 Aug 2023 18:58:59 +0800 Subject: [PATCH] [fix](union) should open/alloc_resource in sink operator instead of source (#23637) --- be/src/pipeline/exec/union_sink_operator.h | 2 -- be/src/pipeline/exec/union_source_operator.h | 3 +++ be/src/vec/exec/vunion_node.cpp | 10 +++++++++- be/src/vec/exec/vunion_node.h | 3 +++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index dfe7085d5e..0da75147cc 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -56,8 +56,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - // this operator in sink open directly return, do this work in source - Status open(RuntimeState* /*state*/) override { return Status::OK(); } Status close(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 8051bdd512..8bd2f484f3 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -52,6 +52,9 @@ public: UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node, std::shared_ptr); + // this operator in source open directly return, do this work in sink + Status open(RuntimeState* /*state*/) override { return Status::OK(); } + Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; bool can_read() override; diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index e9caa494fd..90cbe5bb7a 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -106,6 +106,12 @@ Status VUnionNode::open(RuntimeState* state) { Status VUnionNode::alloc_resource(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); + + std::unique_lock l(_resource_lock); + if (_resource_allocated) { + return Status::OK(); + } + // open const expr lists. for (const auto& exprs : _const_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); @@ -114,7 +120,9 @@ Status VUnionNode::alloc_resource(RuntimeState* state) { for (const auto& exprs : _child_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); } - return ExecNode::alloc_resource(state); + RETURN_IF_ERROR(ExecNode::alloc_resource(state)); + _resource_allocated = true; + return Status::OK(); } Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) { diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h index c25bb07102..ac63f9ca63 100644 --- a/be/src/vec/exec/vunion_node.h +++ b/be/src/vec/exec/vunion_node.h @@ -91,6 +91,9 @@ private: /// to -1 if no child needs to be closed. int _to_close_child_idx; + std::mutex _resource_lock; + bool _resource_allocated {false}; + // Time spent to evaluates exprs and materializes the results RuntimeProfile::Counter* _materialize_exprs_evaluate_timer = nullptr; /// GetNext() for the passthrough case. We pass 'block' directly into the GetNext()