diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index be18743a46..9a3f5baf38 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -163,7 +163,7 @@ Status EsHttpScanNode::open(RuntimeState* state) { auto checker = [&](int index) { return _conjunct_to_predicate[index] != -1 && list[_conjunct_to_predicate[index]]; }; - std::string vconjunct_information = _peel_pushed_vconjunct(checker); + std::string vconjunct_information = _peel_pushed_vconjunct(state, checker); _scanner_profile->add_info_string("VconjunctExprTree", vconjunct_information); RETURN_IF_ERROR(start_scanners()); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index c0ec6f1150..1cfb13dd4a 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -552,7 +552,7 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) { // filter idle conjunct in vexpr_contexts auto checker = [&](int index) { return _pushed_conjuncts_index.count(index); }; - std::string vconjunct_information = _peel_pushed_vconjunct(checker); + std::string vconjunct_information = _peel_pushed_vconjunct(state, checker); _scanner_profile->add_info_string("VconjunctExprTree", vconjunct_information); } diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index 21f11652b1..a00bbccb8f 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -60,7 +60,7 @@ public: virtual Status get_batch(RuntimeState* state, RowBatch* batch, bool* eof); - Status close(RuntimeState* state); + virtual Status close(RuntimeState* state); RuntimeState* runtime_state() { return _runtime_state; } diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index b05108f29d..b9b6032375 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -49,7 +49,8 @@ Status ScanNode::prepare(RuntimeState* state) { // It relies on the logic of function convertConjunctsToAndCompoundPredicate() of FE splicing expr. // It requires FE to satisfy each splicing with 'and' expr, and spliced from left to right, in order. // Expr tree specific forms do not require requirements. -std::string ScanNode::_peel_pushed_vconjunct(const std::function& checker) { +std::string ScanNode::_peel_pushed_vconjunct(RuntimeState* state, + const std::function& checker) { if (_vconjunct_ctx_ptr.get() == nullptr) { return "null"; } @@ -59,7 +60,7 @@ std::string ScanNode::_peel_pushed_vconjunct(const std::function& che if (conjunct_expr_root != nullptr) { vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct( - conjunct_expr_root, leaf_index, checker); + state, *_vconjunct_ctx_ptr.get(), conjunct_expr_root, leaf_index, checker); if (new_conjunct_expr_root == nullptr) { _vconjunct_ctx_ptr = nullptr; } else { diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 92c8ac3efb..e4b4a58d30 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -94,6 +94,7 @@ public: protected: std::string _peel_pushed_vconjunct( + RuntimeState* state, const std::function& checker); // remove pushed expr from conjunct tree RuntimeProfile::Counter* _bytes_read_counter; // # bytes read from the scanner diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 333b76faa5..e3d83cfe11 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -132,21 +132,21 @@ struct ProcessRuntimeFilterBuild { if (_join_node->_runtime_filter_descs.empty()) { return Status::OK(); } - VRuntimeFilterSlots* runtime_filter_slots = - new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs, - _join_node->_runtime_filter_descs); + VRuntimeFilterSlots runtime_filter_slots(_join_node->_probe_expr_ctxs, + _join_node->_build_expr_ctxs, + _join_node->_runtime_filter_descs); - RETURN_IF_ERROR(runtime_filter_slots->init(state, hash_table_ctx.hash_table.get_size())); + RETURN_IF_ERROR(runtime_filter_slots.init(state, hash_table_ctx.hash_table.get_size())); - if (!runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) { + if (!runtime_filter_slots.empty() && !_join_node->_inserted_rows.empty()) { { SCOPED_TIMER(_join_node->_push_compute_timer); - runtime_filter_slots->insert(_join_node->_inserted_rows); + runtime_filter_slots.insert(_join_node->_inserted_rows); } } { SCOPED_TIMER(_join_node->_push_down_timer); - runtime_filter_slots->publish(); + runtime_filter_slots.publish(); } return Status::OK(); diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 410fa8f6b6..509e80e6e4 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -83,4 +83,12 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo void VOlapScanner::set_tablet_reader() { _tablet_reader = std::make_unique(); } + +Status VOlapScanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } + if (_vconjunct_ctx) _vconjunct_ctx->close(state); + return OlapScanner::close(state); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index d054fdbf77..40db5e1c17 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -37,6 +37,8 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, bool* eof); + Status close(RuntimeState* state) override; + Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) override { return Status::NotSupported("Not Implemented VOlapScanNode Node::get_next scalar"); } diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 26681baf91..671fa2250c 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -108,7 +108,10 @@ public: VExpr* parent, int* node_idx, VExpr** root_expr, VExprContext** ctx); const std::vector& children() const { return _children; } - void set_children(std::vector children) { _children = children; } + void set_children(RuntimeState* state, VExprContext* ctx, std::vector children) { + close(state, ctx, ctx->get_function_state_scope()); + _children = children; + } virtual std::string debug_string() const; static std::string debug_string(const std::vector& exprs); static std::string debug_string(const std::vector& ctxs); diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 2df377d770..80da73531b 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -65,6 +65,10 @@ public: return _last_result_column_id; } + FunctionContext::FunctionStateScope get_function_state_scope() const { + return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; + } + private: friend class VExpr; diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index 6cf9a14471..8e50d12b5d 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -64,18 +64,20 @@ public: return data_types; } - static VExpr* dfs_peel_conjunct(VExpr* expr, int& leaf_index, - std::function checker) { + static VExpr* dfs_peel_conjunct(RuntimeState* state, VExprContext* context, VExpr* expr, + int& leaf_index, std::function checker) { static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); }; if (is_leaf(expr)) { return checker(leaf_index++) ? nullptr : expr; } else { - VExpr* left_child = dfs_peel_conjunct(expr->children()[0], leaf_index, checker); - VExpr* right_child = dfs_peel_conjunct(expr->children()[1], leaf_index, checker); + VExpr* left_child = + dfs_peel_conjunct(state, context, expr->children()[0], leaf_index, checker); + VExpr* right_child = + dfs_peel_conjunct(state, context, expr->children()[1], leaf_index, checker); if (left_child != nullptr && right_child != nullptr) { - expr->set_children({left_child, right_child}); + expr->set_children(state, context, {left_child, right_child}); return expr; } // here do not close Expr* now