[Bug](memleak) Fix emptyoperator may cause node not close (#20525)

This commit is contained in:
wangbo
2023-06-07 01:27:13 +08:00
committed by GitHub
parent b65094c8df
commit 3aa0c2bdbf
3 changed files with 18 additions and 5 deletions

View File

@ -21,7 +21,7 @@
namespace doris::pipeline {
OperatorPtr EmptySourceOperatorBuilder::build_operator() {
return std::make_shared<EmptySourceOperator>(this);
return std::make_shared<EmptySourceOperator>(this, _exec_node);
}
} // namespace doris::pipeline

View File

@ -37,8 +37,10 @@ namespace doris::pipeline {
class EmptySourceOperatorBuilder final : public OperatorBuilderBase {
public:
EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor)
: OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {}
EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor, ExecNode* exec_node)
: OperatorBuilderBase(id, "EmptySourceOperator"),
_row_descriptor(row_descriptor),
_exec_node(exec_node) {}
bool is_source() const override { return true; }
@ -48,11 +50,14 @@ public:
private:
RowDescriptor _row_descriptor;
ExecNode* _exec_node = nullptr;
};
class EmptySourceOperator final : public OperatorBase {
public:
EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {}
EmptySourceOperator(OperatorBuilderBase* builder, ExecNode* exec_node)
: OperatorBase(builder), _exec_node(exec_node) {}
bool can_read() override { return true; }
bool is_pending_finish() const override { return false; }
@ -67,6 +72,14 @@ public:
}
Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); }
Status close(RuntimeState* state) override {
_exec_node->close(state);
return Status::OK();
}
private:
ExecNode* _exec_node = nullptr;
};
} // namespace doris::pipeline

View File

@ -596,7 +596,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
} else {
OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>(
next_operator_builder_id(), node->child(1)->row_desc());
next_operator_builder_id(), node->child(1)->row_desc(), node->child(1));
new_pipe->add_operator(builder);
}
OperatorBuilderPtr join_sink =