From 31a4f96f0167d7865ae10aaae579f0c34afad013 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 14 Jun 2023 18:01:07 +0800 Subject: [PATCH] [refactor](exprcontext) move close to expr context's dector method (#20747) The close method does nothing. But I am not sure we could remove it. So that I add it to dector method and remove many many calls. --- be/src/exec/exec_node.cpp | 6 ---- be/src/olap/push_handler.cpp | 23 ------------- be/src/olap/schema_change.cpp | 2 -- be/src/runtime/fold_constant_executor.cpp | 3 -- be/src/service/point_query_executor.cpp | 6 +--- be/src/vec/common/sort/vsort_exec_exprs.cpp | 8 +---- .../format/parquet/vparquet_group_reader.cpp | 5 --- be/src/vec/exec/join/vhash_join_node.cpp | 6 ---- be/src/vec/exec/join/vjoin_node_base.cpp | 1 - .../vec/exec/join/vnested_loop_join_node.cpp | 4 --- be/src/vec/exec/scan/vfile_scanner.cpp | 34 ------------------- be/src/vec/exec/scan/vscan_node.cpp | 14 ++------ be/src/vec/exec/scan/vscanner.cpp | 11 ------ be/src/vec/exec/vaggregation_node.cpp | 1 - be/src/vec/exec/vanalytic_eval_node.cpp | 6 ---- be/src/vec/exec/vpartition_sort_node.cpp | 1 - be/src/vec/exec/vrepeat_node.cpp | 1 - be/src/vec/exec/vset_operation_node.cpp | 3 -- be/src/vec/exec/vtable_function_node.h | 2 -- be/src/vec/exec/vunion_node.cpp | 6 ---- be/src/vec/exprs/vbitmap_predicate.cpp | 5 ++- be/src/vec/exprs/vbitmap_predicate.h | 3 +- be/src/vec/exprs/vbloom_predicate.cpp | 5 ++- be/src/vec/exprs/vbloom_predicate.h | 3 +- be/src/vec/exprs/vcase_expr.cpp | 5 ++- be/src/vec/exprs/vcase_expr.h | 3 +- be/src/vec/exprs/vcast_expr.cpp | 5 ++- be/src/vec/exprs/vcast_expr.h | 3 +- be/src/vec/exprs/vectorized_agg_fn.cpp | 6 ++-- be/src/vec/exprs/vectorized_fn_call.cpp | 5 ++- be/src/vec/exprs/vectorized_fn_call.h | 3 +- be/src/vec/exprs/vexpr.cpp | 11 ++---- be/src/vec/exprs/vexpr.h | 5 +-- be/src/vec/exprs/vexpr_context.cpp | 23 +++++++------ be/src/vec/exprs/vexpr_context.h | 8 ++--- be/src/vec/exprs/vin_predicate.cpp | 5 ++- be/src/vec/exprs/vin_predicate.h | 3 +- be/src/vec/exprs/vmatch_predicate.cpp | 5 ++- be/src/vec/exprs/vmatch_predicate.h | 3 +- be/src/vec/exprs/vruntimefilter_wrapper.cpp | 4 +-- be/src/vec/exprs/vruntimefilter_wrapper.h | 3 +- be/src/vec/exprs/vschema_change_expr.cpp | 5 ++- be/src/vec/exprs/vschema_change_expr.h | 3 +- be/src/vec/sink/vdata_stream_sender.cpp | 1 - be/src/vec/sink/vmemory_scratch_sink.cpp | 1 - be/src/vec/sink/vresult_file_sink.cpp | 2 -- be/src/vec/sink/vresult_sink.cpp | 2 -- be/src/vec/sink/vtable_sink.cpp | 1 - be/src/vec/sink/vtablet_sink.cpp | 7 +--- .../serde/data_type_serde_mysql_test.cpp | 3 -- be/test/vec/exprs/vexpr_test.cpp | 2 -- 51 files changed, 56 insertions(+), 230 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 2707e47a76..8e91212eb2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -182,12 +182,6 @@ void ExecNode::release_resource(doris::RuntimeState* state) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } - for (auto& conjunct : _conjuncts) { - conjunct->close(state); - } - - vectorized::VExpr::close(_projections, state); - runtime_profile()->add_to_span(_span); _is_resource_released = true; } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 1fcb1abcdc..1a92f03829 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -413,29 +413,6 @@ Status PushBrokerReader::next(vectorized::Block* block) { Status PushBrokerReader::close() { _ready = false; - for (auto ctx : _dest_expr_ctxs) { - if (ctx != nullptr) { - ctx->close(_runtime_state.get()); - } - } - - for (auto& expr : _push_down_exprs) { - expr->close(_runtime_state.get()); - } - - for (auto& [k, v] : _slot_id_to_filter_conjuncts) { - for (auto& ctx : v) { - if (ctx != nullptr) { - ctx->close(_runtime_state.get()); - } - } - } - - for (auto& ctx : _not_single_slot_filter_conjuncts) { - if (ctx != nullptr) { - ctx->close(_runtime_state.get()); - } - } return Status::OK(); } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 75ce050f10..98f7e8e535 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -272,7 +272,6 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, if (_where_expr != nullptr) { vectorized::VExprContextSPtr ctx = nullptr; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx)); - Defer defer {[&]() { ctx->close(state); }}; RETURN_IF_ERROR(ctx->prepare(state, row_desc)); RETURN_IF_ERROR(ctx->open(state)); @@ -304,7 +303,6 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, } else if (_schema_mapping[idx].expr != nullptr) { vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx)); - Defer defer {[&]() { ctx->close(state); }}; RETURN_IF_ERROR(ctx->prepare(state, row_desc)); RETURN_IF_ERROR(ctx->open(state)); diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 2d89dbf254..793f0209d1 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -83,9 +83,6 @@ Status FoldConstantExecutor::fold_constant_vexpr(const TFoldConstantParams& para const TExpr& texpr = n.second; // create expr tree from TExpr RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(texpr, ctx)); - - // close context expr - Defer defer {[&]() { ctx->close(_runtime_state.get()); }}; // prepare and open context RETURN_IF_ERROR(_prepare_and_open(ctx.get())); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 4e31442896..08249a00fc 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -46,11 +46,7 @@ namespace doris { -Reusable::~Reusable() { - for (auto& ctx : _output_exprs_ctxs) { - ctx->close(_runtime_state.get()); - } -} +Reusable::~Reusable() {} Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector& output_exprs, size_t block_size) { diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp b/be/src/vec/common/sort/vsort_exec_exprs.cpp index a8dd70ae1c..44d40e1657 100644 --- a/be/src/vec/common/sort/vsort_exec_exprs.cpp +++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp @@ -84,12 +84,6 @@ Status VSortExecExprs::open(RuntimeState* state) { return Status::OK(); } -void VSortExecExprs::close(RuntimeState* state) { - if (_materialize_tuple) { - VExpr::close(_sort_tuple_slot_expr_ctxs, state); - } - VExpr::close(_lhs_ordering_expr_ctxs, state); - VExpr::close(_rhs_ordering_expr_ctxs, state); -} +void VSortExecExprs::close(RuntimeState* state) {} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 06dd72eb7b..6faaab0177 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -97,11 +97,6 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, RowGroupReader::~RowGroupReader() { _column_readers.clear(); - for (auto& ctx : _dict_filter_conjuncts) { - if (ctx) { - ctx->close(_state); - } - } _obj_pool->clear(); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 9b410f56ae..be94987bdb 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -763,12 +763,6 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* state) { } void HashJoinNode::release_resource(RuntimeState* state) { - VExpr::close(_build_expr_ctxs, state); - VExpr::close(_probe_expr_ctxs, state); - - for (auto& conjunct : _other_join_conjuncts) { - conjunct->close(state); - } _release_mem(); VJoinNodeBase::release_resource(state); } diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index cc50299db5..1171297d80 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -130,7 +130,6 @@ Status VJoinNodeBase::close(RuntimeState* state) { } void VJoinNodeBase::release_resource(RuntimeState* state) { - VExpr::close(_output_expr_ctxs, state); _join_block.clear(); ExecNode::release_resource(state); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 491a9c6854..41c397671d 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -710,10 +710,6 @@ bool VNestedLoopJoinNode::need_more_input_data() const { void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) { VJoinNodeBase::release_resource(state); - VExpr::close(_filter_src_expr_ctxs, state); - for (auto& conjunct : _join_conjuncts) { - conjunct->close(state); - } } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 2d9100be5f..535dac3a68 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -910,40 +910,6 @@ Status VFileScanner::close(RuntimeState* state) { return Status::OK(); } - for (auto ctx : _dest_vexpr_ctx) { - if (ctx != nullptr) { - ctx->close(state); - } - } - - for (auto& it : _col_default_value_ctx) { - if (it.second != nullptr) { - it.second->close(state); - } - } - - for (auto& conjunct : _pre_conjunct_ctxs) { - conjunct->close(state); - } - - for (auto& conjunct : _push_down_conjuncts) { - conjunct->close(state); - } - - for (auto& [k, v] : _slot_id_to_filter_conjuncts) { - for (auto& ctx : v) { - if (ctx != nullptr) { - ctx->close(state); - } - } - } - - for (auto ctx : _not_single_slot_filter_conjuncts) { - if (ctx != nullptr) { - ctx->close(state); - } - } - if (config::enable_file_cache && _state->query_options().enable_file_cache) { io::FileCacheProfileReporter cache_profile(_profile); cache_profile.update(_file_cache_statistics.get()); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index a4b11bc55c..4c55150e53 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -418,14 +418,6 @@ void VScanNode::release_resource(RuntimeState* state) { } } - for (auto& ctx : _stale_expr_ctxs) { - ctx->close(state); - } - - for (auto& ctx : _common_expr_ctxs_push_down) { - ctx->close(state); - } - ExecNode::release_resource(state); } @@ -638,16 +630,16 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExp return Status::OK(); } else { if (left_child == nullptr) { - conjunct_expr_root->children()[0]->close(_state, context, + conjunct_expr_root->children()[0]->close(context, context->get_function_state_scope()); } if (right_child == nullptr) { - conjunct_expr_root->children()[1]->close(_state, context, + conjunct_expr_root->children()[1]->close(context, context->get_function_state_scope()); } // here only close the and expr self, do not close the child conjunct_expr_root->set_children({}); - conjunct_expr_root->close(_state, context, context->get_function_state_scope()); + conjunct_expr_root->close(context, context->get_function_state_scope()); } // here do not close VExpr* now diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index d7c6d20aa0..ee2368c392 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -145,17 +145,6 @@ Status VScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } - for (auto& ctx : _stale_expr_ctxs) { - ctx->close(state); - } - - for (auto& conjunct : _conjuncts) { - conjunct->close(state); - } - - for (auto& ctx : _common_expr_ctxs_push_down) { - ctx->close(state); - } COUNTER_UPDATE(_parent->_scanner_wait_worker_timer, _scanner_wait_worker_timer); _is_closed = true; diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 553b227d62..591ad3e55d 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -601,7 +601,6 @@ Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_b void AggregationNode::release_resource(RuntimeState* state) { for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state); - VExpr::close(_probe_expr_ctxs, state); if (_executor.close) _executor.close(); /// _hash_table_size_counter may be null if prepare failed. diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 41783a6267..09c64e752d 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -293,12 +293,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* state) { if (is_closed()) { return; } - - VExpr::close(_partition_by_eq_expr_ctxs, state); - VExpr::close(_order_by_eq_expr_ctxs, state); - for (size_t i = 0; i < _agg_functions_size; ++i) { - VExpr::close(_agg_expr_ctxs[i], state); - } for (auto* agg_function : _agg_functions) { agg_function->close(state); } diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index f52e8cb678..8d7e1ec866 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -335,7 +335,6 @@ Status VPartitionSortNode::close(RuntimeState* state) { } void VPartitionSortNode::release_resource(RuntimeState* state) { - VExpr::close(_partition_expr_ctxs, state); _vsort_exec_exprs.close(state); ExecNode::release_resource(state); } diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 8a67e5a90d..7ac697338e 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -279,7 +279,6 @@ Status VRepeatNode::close(RuntimeState* state) { } void VRepeatNode::release_resource(RuntimeState* state) { - VExpr::close(_expr_ctxs, state); ExecNode::release_resource(state); } diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 58b59c1b77..17db664c8a 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -176,9 +176,6 @@ VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlan template void VSetOperationNode::release_resource(RuntimeState* state) { - for (auto& exprs : _child_expr_lists) { - VExpr::close(exprs, state); - } release_mem(); ExecNode::release_resource(state); } diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index cfe74860b1..040ca3f7af 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -63,8 +63,6 @@ public: bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; } void release_resource(doris::RuntimeState* state) override { - VExpr::close(_vfn_ctxs, state); - if (_num_rows_filtered_counter != nullptr) { COUNTER_SET(_num_rows_filtered_counter, static_cast(_num_rows_filtered)); } diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index a2fef62076..7c079a3b46 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -317,12 +317,6 @@ void VUnionNode::release_resource(RuntimeState* state) { if (is_closed()) { return; } - for (auto& exprs : _const_expr_lists) { - VExpr::close(exprs, state); - } - for (auto& exprs : _child_expr_lists) { - VExpr::close(exprs, state); - } return ExecNode::release_resource(state); } diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp b/be/src/vec/exprs/vbitmap_predicate.cpp index 81e08022d0..0e158298d8 100644 --- a/be/src/vec/exprs/vbitmap_predicate.cpp +++ b/be/src/vec/exprs/vbitmap_predicate.cpp @@ -117,10 +117,9 @@ doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext* co return Status::OK(); } -void vectorized::VBitmapPredicate::close(doris::RuntimeState* state, - vectorized::VExprContext* context, +void vectorized::VBitmapPredicate::close(vectorized::VExprContext* context, FunctionContext::FunctionStateScope scope) { - VExpr::close(state, context, scope); + VExpr::close(context, scope); } const std::string& vectorized::VBitmapPredicate::expr_name() const { diff --git a/be/src/vec/exprs/vbitmap_predicate.h b/be/src/vec/exprs/vbitmap_predicate.h index 8b4e6e00b8..4699b17c03 100644 --- a/be/src/vec/exprs/vbitmap_predicate.h +++ b/be/src/vec/exprs/vbitmap_predicate.h @@ -58,8 +58,7 @@ public: doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VBitmapPredicate::create_shared(*this); } diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 1d11ee44af..06bd21a6eb 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -69,9 +69,8 @@ Status VBloomPredicate::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VBloomPredicate::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { - VExpr::close(state, context, scope); +void VBloomPredicate::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { + VExpr::close(context, scope); } Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index aaef48a098..2f5f7e9380 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -49,8 +49,7 @@ public: VExprContext* context) override; doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VBloomPredicate::create_shared(*this); } const std::string& expr_name() const override; void set_filter(std::shared_ptr& filter); diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp index f5f4172b6a..5c9cdd3c15 100644 --- a/be/src/vec/exprs/vcase_expr.cpp +++ b/be/src/vec/exprs/vcase_expr.cpp @@ -84,10 +84,9 @@ Status VCaseExpr::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VCaseExpr::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VCaseExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); - VExpr::close(state, context, scope); + VExpr::close(context, scope); } Status VCaseExpr::execute(VExprContext* context, Block* block, int* result_column_id) { diff --git a/be/src/vec/exprs/vcase_expr.h b/be/src/vec/exprs/vcase_expr.h index ece370b736..713b05d3de 100644 --- a/be/src/vec/exprs/vcase_expr.h +++ b/be/src/vec/exprs/vcase_expr.h @@ -49,8 +49,7 @@ public: VExprContext* context) override; virtual Status open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - virtual void close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + virtual void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VCaseExpr::create_shared(*this); } virtual const std::string& expr_name() const override; virtual std::string debug_string() const override; diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 21687b7cf2..ef4dd08115 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -86,10 +86,9 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, return Status::OK(); } -void VCastExpr::close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VCastExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); - VExpr::close(state, context, scope); + VExpr::close(context, scope); } doris::Status VCastExpr::execute(VExprContext* context, doris::vectorized::Block* block, diff --git a/be/src/vec/exprs/vcast_expr.h b/be/src/vec/exprs/vcast_expr.h index 7589ab38d5..243882eb5b 100644 --- a/be/src/vec/exprs/vcast_expr.h +++ b/be/src/vec/exprs/vcast_expr.h @@ -49,8 +49,7 @@ public: VExprContext* context) override; virtual doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - virtual void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + virtual void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; virtual VExprSPtr clone() const override { return VCastExpr::create_shared(*this); } virtual const std::string& expr_name() const override; virtual std::string debug_string() const override; diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 4270143878..40db922312 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -206,12 +206,12 @@ Status AggFnEvaluator::open(RuntimeState* state) { return VExpr::open(_input_exprs_ctxs, state); } -void AggFnEvaluator::close(RuntimeState* state) { - VExpr::close(_input_exprs_ctxs, state); -} +void AggFnEvaluator::close(RuntimeState* state) {} + void AggFnEvaluator::create(AggregateDataPtr place) { _function->create(place); } + void AggFnEvaluator::destroy(AggregateDataPtr place) { _function->destroy(place); } diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 582acb9da9..3334bf0e81 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -130,10 +130,9 @@ Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VectorizedFnCall::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VectorizedFnCall::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); - VExpr::close(state, context, scope); + VExpr::close(context, scope); } Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block, diff --git a/be/src/vec/exprs/vectorized_fn_call.h b/be/src/vec/exprs/vectorized_fn_call.h index 2d64f9a341..034fbb19f9 100644 --- a/be/src/vec/exprs/vectorized_fn_call.h +++ b/be/src/vec/exprs/vectorized_fn_call.h @@ -49,8 +49,7 @@ public: Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; Status open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VectorizedFnCall::create_shared(*this); } const std::string& expr_name() const override; std::string debug_string() const override; diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index fe22bbc8da..919c2f305e 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -121,10 +121,9 @@ Status VExpr::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VExpr::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { for (int i = 0; i < _children.size(); ++i) { - _children[i]->close(state, context, scope); + _children[i]->close(context, scope); } } @@ -309,12 +308,6 @@ Status VExpr::prepare(const VExprContextSPtrs& ctxs, RuntimeState* state, return Status::OK(); } -void VExpr::close(const VExprContextSPtrs& ctxs, RuntimeState* state) { - for (auto ctx : ctxs) { - ctx->close(state); - } -} - Status VExpr::open(const VExprContextSPtrs& ctxs, RuntimeState* state) { for (int i = 0; i < ctxs.size(); ++i) { RETURN_IF_ERROR(ctxs[i]->open(state)); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 072ba48af3..ca1e241768 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -111,8 +111,7 @@ public: /// If scope if FRAGMENT_LOCAL, both fragment- and thread-local state should be torn /// down. Otherwise, if scope is THREAD_LOCAL, only thread-local state should be torn /// down. - virtual void close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope); + virtual void close(VExprContext* context, FunctionContext::FunctionStateScope scope); DataTypePtr& data_type() { return _data_type; } @@ -140,8 +139,6 @@ public: static Status clone_if_not_exists(const VExprContextSPtrs& ctxs, RuntimeState* state, VExprContextSPtrs& new_ctxs); - static void close(const VExprContextSPtrs& ctxs, RuntimeState* state); - bool is_nullable() const { return _data_type->is_nullable(); } PrimitiveType result_type() const { return _type.type; } diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 50d17ff531..5a309a0d49 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -45,15 +45,16 @@ VExprContext::VExprContext(const VExprSPtr& expr) _is_clone(false), _prepared(false), _opened(false), - _closed(false), _last_result_column_id(-1) {} VExprContext::~VExprContext() { - // Do not delete this code, this code here is used to check if forget to close the opened context - // Or there will be memory leak - DCHECK(!_prepared || _closed || k_doris_exit) - << " prepare:" << _prepared << " closed:" << _closed - << " expr:" << _root->debug_string(); + // In runtime filter, only create expr context to get expr root, will not call + // prepare or open, so that it is not need to call close. And call close may core + // because the function context in expr is not set. + if (!_prepared || !_opened) { + return; + } + close(); } doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) { @@ -84,12 +85,14 @@ doris::Status VExprContext::open(doris::RuntimeState* state) { return _root->open(state, this, scope); } -void VExprContext::close(doris::RuntimeState* state) { - DCHECK(!_closed); +void VExprContext::close() { + // Sometimes expr context may not have a root, then it need not call close + if (_root == nullptr) { + return; + } FunctionContext::FunctionStateScope scope = _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; - _root->close(state, this, scope); - _closed = true; + _root->close(this, scope); } doris::Status VExprContext::clone(RuntimeState* state, VExprContextSPtr& new_ctx) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 3332e6f816..cc88902586 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -44,7 +44,6 @@ public: ~VExprContext(); [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); [[nodiscard]] Status open(RuntimeState* state); - void close(RuntimeState* state); [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx); [[nodiscard]] Status execute(Block* block, int* result_column_id); @@ -117,7 +116,6 @@ public: _is_clone = other._is_clone; _prepared = other._prepared; _opened = other._opened; - _closed = other._closed; for (auto& fn : other._fn_contexts) { _fn_contexts.emplace_back(fn->clone()); @@ -134,13 +132,16 @@ public: _is_clone = other._is_clone; _prepared = other._prepared; _opened = other._opened; - _closed = other._closed; _fn_contexts = std::move(other._fn_contexts); _last_result_column_id = other._last_result_column_id; _depth_num = other._depth_num; return *this; } +private: + // Close method is called in vexpr context dector, not need call expicility + void close(); + private: friend class VExpr; @@ -153,7 +154,6 @@ private: /// Variables keeping track of current state. bool _prepared; bool _opened; - bool _closed; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this VExprContext. diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp index 3aecb5fbca..83e6bd6320 100644 --- a/be/src/vec/exprs/vin_predicate.cpp +++ b/be/src/vec/exprs/vin_predicate.cpp @@ -83,10 +83,9 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VInPredicate::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VInPredicate::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); - VExpr::close(state, context, scope); + VExpr::close(context, scope); } Status VInPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h index 4d047de6a2..02b2953094 100644 --- a/be/src/vec/exprs/vin_predicate.h +++ b/be/src/vec/exprs/vin_predicate.h @@ -48,8 +48,7 @@ public: VExprContext* context) override; doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VInPredicate::create_shared(*this); } const std::string& expr_name() const override; diff --git a/be/src/vec/exprs/vmatch_predicate.cpp b/be/src/vec/exprs/vmatch_predicate.cpp index e47a14f779..a06d248979 100644 --- a/be/src/vec/exprs/vmatch_predicate.cpp +++ b/be/src/vec/exprs/vmatch_predicate.cpp @@ -93,10 +93,9 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context, return Status::OK(); } -void VMatchPredicate::close(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { +void VMatchPredicate::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { VExpr::close_function_context(context, scope, _function); - VExpr::close(state, context, scope); + VExpr::close(context, scope); } Status VMatchPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { diff --git a/be/src/vec/exprs/vmatch_predicate.h b/be/src/vec/exprs/vmatch_predicate.h index 0c0a9d8ea8..2868454db3 100644 --- a/be/src/vec/exprs/vmatch_predicate.h +++ b/be/src/vec/exprs/vmatch_predicate.h @@ -50,8 +50,7 @@ public: VExprContext* context) override; doris::Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VMatchPredicate::create_shared(*this); } const std::string& expr_name() const override; const std::string& function_name() const; diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 1383d98c60..1d498f6100 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -66,9 +66,9 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context, return _impl->open(state, context, scope); } -void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context, +void VRuntimeFilterWrapper::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { - _impl->close(state, context, scope); + _impl->close(context, scope); } bool VRuntimeFilterWrapper::is_constant() const { diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 259484bd78..4c76acd114 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -55,8 +55,7 @@ public: FunctionContext::FunctionStateScope scope) override; std::string debug_string() const override { return _impl->debug_string(); } bool is_constant() const override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VRuntimeFilterWrapper::create_shared(*this); } const std::string& expr_name() const override; const VExprSPtrs& children() const override { return _impl->children(); } diff --git a/be/src/vec/exprs/vschema_change_expr.cpp b/be/src/vec/exprs/vschema_change_expr.cpp index c8da5fccf3..f8bf537331 100644 --- a/be/src/vec/exprs/vschema_change_expr.cpp +++ b/be/src/vec/exprs/vschema_change_expr.cpp @@ -73,9 +73,8 @@ Status VSchemaChangeExpr::open(doris::RuntimeState* state, VExprContext* context return Status::OK(); } -void VSchemaChangeExpr::close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { - VExpr::close(state, context, scope); +void VSchemaChangeExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { + VExpr::close(context, scope); } Status VSchemaChangeExpr::execute(VExprContext* context, doris::vectorized::Block* block, diff --git a/be/src/vec/exprs/vschema_change_expr.h b/be/src/vec/exprs/vschema_change_expr.h index 9dd43a0ae2..f6bf96aa85 100644 --- a/be/src/vec/exprs/vschema_change_expr.h +++ b/be/src/vec/exprs/vschema_change_expr.h @@ -53,8 +53,7 @@ public: VExprContext* context) override; Status open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - void close(doris::RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; + void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExprSPtr clone() const override { return VSchemaChangeExpr::create_shared(*this); } const std::string& expr_name() const override; std::string debug_string() const override; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 5886f82dad..da2c3da516 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -696,7 +696,6 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { final_st = st; } } - VExpr::close(_partition_expr_ctxs, state); DataSink::close(state, exec_status); return final_st; } diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index f54465c0ad..b89747a9a2 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -98,7 +98,6 @@ Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { if (_queue != nullptr) { _queue->blocking_put(nullptr); } - VExpr::close(_output_vexpr_ctxs, state); return DataSink::close(state, exec_status); } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index a65bda5af2..7f72c5c651 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -186,8 +186,6 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { _output_block->clear(); } - VExpr::close(_output_vexpr_ctxs, state); - _closed = true; return Status::OK(); } diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 445d342a5b..15b953cb1a 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -161,8 +161,6 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, state->fragment_instance_id()); - - VExpr::close(_output_vexpr_ctxs, state); return DataSink::close(state, exec_status); } diff --git a/be/src/vec/sink/vtable_sink.cpp b/be/src/vec/sink/vtable_sink.cpp index 9640b3b4ec..becb0a9f9f 100644 --- a/be/src/vec/sink/vtable_sink.cpp +++ b/be/src/vec/sink/vtable_sink.cpp @@ -69,7 +69,6 @@ Status VTableSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - VExpr::close(_output_vexpr_ctxs, state); return Status::OK(); } } // namespace vectorized diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index bcb7dd2526..799e3a83a2 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -127,11 +127,7 @@ public: int64_t partition_id; }; -IndexChannel::~IndexChannel() { - if (_where_clause != nullptr) { - _where_clause->close(_parent->_state); - } -} +IndexChannel::~IndexChannel() {} Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); @@ -1402,7 +1398,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { return _close_status; } SCOPED_TIMER(_close_timer); - vectorized::VExpr::close(_output_vexpr_ctxs, state); Status status = exec_status; if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp index 8decfe1a09..1e21f6f463 100644 --- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp @@ -281,9 +281,6 @@ void serialize_and_deserialize_mysql_test() { Status st = mysql_writer.append_block(block); EXPECT_TRUE(st.ok()); - for (auto expr : _output_vexpr_ctxs) { - expr->close(&runtime_stat); - } } TEST(DataTypeSerDeMysqlTest, ScalaSerDeTest) { diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp index 01546ff758..51ae892af7 100644 --- a/be/test/vec/exprs/vexpr_test.cpp +++ b/be/test/vec/exprs/vexpr_test.cpp @@ -72,7 +72,6 @@ TEST(TEST_VEXPR, ABSTEST) { ASSERT_TRUE(state.ok()); state = context->open(&runtime_stat); ASSERT_TRUE(state.ok()); - context->close(&runtime_stat); } // Only the unit test depend on this, but it is wrong, should not use TTupleDesc to create tuple desc, not @@ -168,7 +167,6 @@ TEST(TEST_VEXPR, ABSTEST2) { ASSERT_TRUE(state.ok()); state = context->open(&runtime_stat); ASSERT_TRUE(state.ok()); - context->close(&runtime_stat); } namespace doris {