[fix](union) should open/alloc_resource in sink operator instead of source (#23637)

This commit is contained in:
Jerry Hu
2023-08-30 18:58:59 +08:00
committed by GitHub
parent 6d41272421
commit f7caae08d5
4 changed files with 15 additions and 3 deletions

View File

@ -56,8 +56,6 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
// this operator in sink open directly return, do this work in source
Status open(RuntimeState* /*state*/) override { return Status::OK(); }
Status close(RuntimeState* state) override;

View File

@ -52,6 +52,9 @@ public:
UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node,
std::shared_ptr<DataQueue>);
// this operator in source open directly return, do this work in sink
Status open(RuntimeState* /*state*/) override { return Status::OK(); }
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool can_read() override;

View File

@ -106,6 +106,12 @@ Status VUnionNode::open(RuntimeState* state) {
Status VUnionNode::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
std::unique_lock<std::mutex> l(_resource_lock);
if (_resource_allocated) {
return Status::OK();
}
// open const expr lists.
for (const auto& exprs : _const_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
@ -114,7 +120,9 @@ Status VUnionNode::alloc_resource(RuntimeState* state) {
for (const auto& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
}
return ExecNode::alloc_resource(state);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
_resource_allocated = true;
return Status::OK();
}
Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) {

View File

@ -91,6 +91,9 @@ private:
/// to -1 if no child needs to be closed.
int _to_close_child_idx;
std::mutex _resource_lock;
bool _resource_allocated {false};
// Time spent to evaluates exprs and materializes the results
RuntimeProfile::Counter* _materialize_exprs_evaluate_timer = nullptr;
/// GetNext() for the passthrough case. We pass 'block' directly into the GetNext()