From 8d7a9fd21ba3f22732f8a3b89c26cbf59c3cb14e Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 24 Apr 2023 10:32:11 +0800 Subject: [PATCH] [refactor](exceptionsafe) add factory creator to some class (#18978) make vexprecontext,vexpr,function,query context,runtimestate thread safe. --------- Co-authored-by: yiguolei --- be/src/common/status.h | 5 +++ be/src/exec/base_scanner.cpp | 2 +- be/src/exprs/runtime_filter.cpp | 25 ++++++++----- be/src/olap/push_handler.cpp | 4 +- be/src/olap/schema_change.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 4 +- be/src/runtime/fold_constant_executor.cpp | 4 +- be/src/runtime/plan_fragment_executor.cpp | 3 +- be/src/runtime/runtime_state.h | 3 ++ be/src/service/point_query_executor.cpp | 2 +- be/src/vec/common/schema_util.h | 1 + be/src/vec/common/sort/heap_sorter.cpp | 2 +- be/src/vec/common/sort/heap_sorter.h | 4 ++ be/src/vec/common/sort/sorter.cpp | 3 +- be/src/vec/common/sort/sorter.h | 4 ++ be/src/vec/common/sort/topn_sorter.cpp | 3 +- be/src/vec/common/sort/topn_sorter.h | 2 + .../format/parquet/vparquet_group_reader.cpp | 13 ++++--- be/src/vec/exec/scan/vfile_scanner.cpp | 3 +- be/src/vec/exec/scan/vscan_node.cpp | 4 +- be/src/vec/exec/vsort_node.cpp | 14 ++++--- .../varray_filter_function.cpp | 2 + .../lambda_function/varray_map_function.cpp | 2 + .../table_function/table_function_factory.cpp | 10 +++-- .../table_function/table_function_factory.h | 4 +- be/src/vec/exprs/table_function/vexplode.h | 2 + .../exprs/table_function/vexplode_bitmap.h | 2 + .../table_function/vexplode_json_array.h | 2 + .../exprs/table_function/vexplode_numbers.h | 2 + .../vec/exprs/table_function/vexplode_split.h | 2 + be/src/vec/exprs/varray_literal.h | 2 + be/src/vec/exprs/vbitmap_predicate.h | 4 +- be/src/vec/exprs/vbloom_predicate.h | 4 +- be/src/vec/exprs/vcase_expr.h | 4 +- be/src/vec/exprs/vcast_expr.h | 4 +- be/src/vec/exprs/vcolumn_ref.h | 4 +- be/src/vec/exprs/vcompound_pred.h | 6 ++- be/src/vec/exprs/vdirect_in_predicate.h | 4 +- be/src/vec/exprs/vectorized_agg_fn.cpp | 2 +- be/src/vec/exprs/vectorized_agg_fn.h | 2 + be/src/vec/exprs/vectorized_fn_call.h | 6 ++- be/src/vec/exprs/vexpr.cpp | 37 +++++++++---------- be/src/vec/exprs/vexpr.h | 7 ++++ be/src/vec/exprs/vexpr_context.cpp | 2 +- be/src/vec/exprs/vexpr_context.h | 3 ++ be/src/vec/exprs/vin_predicate.h | 4 +- be/src/vec/exprs/vinfo_func.h | 4 +- be/src/vec/exprs/vlambda_function_call_expr.h | 4 +- be/src/vec/exprs/vlambda_function_expr.h | 4 +- be/src/vec/exprs/vliteral.h | 6 ++- be/src/vec/exprs/vmap_literal.h | 2 + be/src/vec/exprs/vruntimefilter_wrapper.h | 4 +- be/src/vec/exprs/vschema_change_expr.cpp | 4 +- be/src/vec/exprs/vschema_change_expr.h | 4 +- be/src/vec/exprs/vslot_ref.h | 4 +- be/src/vec/exprs/vstruct_literal.h | 2 + be/src/vec/exprs/vtuple_is_null_predicate.h | 4 +- be/src/vec/utils/util.hpp | 5 +-- be/test/exprs/runtime_filter_test.cpp | 4 +- be/test/runtime/test_env.cc | 7 ---- be/test/runtime/test_env.h | 3 -- be/test/testutil/function_utils.cpp | 4 +- 62 files changed, 194 insertions(+), 101 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 9f71f167ff..f33993af64 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -21,6 +21,8 @@ #include "util/stack_util.h" #endif +#include "common/expected.h" + namespace doris { class PStatus; @@ -572,6 +574,9 @@ inline std::string Status::to_string() const { return _s; \ } \ } while (false); + +template +using Result = expected; } // namespace doris #ifdef WARN_UNUSED_RESULT #undef WARN_UNUSED_RESULT diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 1f9b49a0f8..587be2f35e 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -77,7 +77,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, _scanner_eof(false) {} Status BaseScanner::open() { - _full_base_schema_view.reset(new vectorized::schema_util::FullBaseSchemaView); + _full_base_schema_view = vectorized::schema_util::FullBaseSchemaView::create_unique(); RETURN_IF_ERROR(init_expr_ctxes()); if (_params.__isset.strict_mode) { _strict_mode = _params.strict_mode; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 4376eac810..e2ec2d57a2 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -277,7 +277,8 @@ Status create_literal(ObjectPool* pool, const TypeDescriptor& type, const void* return Status::InvalidArgument("Invalid type!"); } - *reinterpret_cast(expr) = pool->add(new vectorized::VLiteral(node)); + *reinterpret_cast(expr) = + pool->add(vectorized::VLiteral::create_unique(node).release()); return Status::OK(); } @@ -1782,11 +1783,13 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector* node.__set_vector_opcode(to_in_opcode(_column_return_type)); node.__set_is_nullable(false); - auto in_pred = _pool->add(new vectorized::VDirectInPredicate(node)); + auto in_pred = + _pool->add(vectorized::VDirectInPredicate::create_unique(node).release()); in_pred->set_filter(_context.hybrid_set); auto cloned_vexpr = vprob_expr->root()->clone(_pool); in_pred->add_child(cloned_vexpr); - auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, in_pred)); + auto wrapper = _pool->add( + vectorized::VRuntimeFilterWrapper::create_unique(node, in_pred).release()); container->push_back(wrapper); } break; @@ -1804,7 +1807,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector* max_pred->add_child(cloned_vexpr); max_pred->add_child(max_literal); container->push_back( - _pool->add(new vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred))); + _pool->add(vectorized::VRuntimeFilterWrapper::create_unique(max_pred_node, max_pred) + .release())); // create min filter vectorized::VExpr* min_pred = nullptr; @@ -1818,7 +1822,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector* min_pred->add_child(cloned_vexpr); min_pred->add_child(min_literal); container->push_back( - _pool->add(new vectorized::VRuntimeFilterWrapper(min_pred_node, min_pred))); + _pool->add(vectorized::VRuntimeFilterWrapper::create_unique(min_pred_node, min_pred) + .release())); break; } case RuntimeFilterType::BLOOM_FILTER: { @@ -1832,11 +1837,12 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector* node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); node.__set_is_nullable(false); - auto bloom_pred = _pool->add(new vectorized::VBloomPredicate(node)); + auto bloom_pred = _pool->add(vectorized::VBloomPredicate::create_unique(node).release()); bloom_pred->set_filter(_context.bloom_filter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); bloom_pred->add_child(cloned_vexpr); - auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bloom_pred)); + auto wrapper = _pool->add( + vectorized::VRuntimeFilterWrapper::create_unique(node, bloom_pred).release()); container->push_back(wrapper); break; } @@ -1851,11 +1857,12 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector* node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); node.__set_is_nullable(false); - auto bitmap_pred = _pool->add(new vectorized::VBitmapPredicate(node)); + auto bitmap_pred = _pool->add(vectorized::VBitmapPredicate::create_unique(node).release()); bitmap_pred->set_filter(_context.bitmap_filter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); bitmap_pred->add_child(cloned_vexpr); - auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bitmap_pred)); + auto wrapper = _pool->add( + vectorized::VRuntimeFilterWrapper::create_unique(node, bitmap_pred).release()); container->push_back(wrapper); break; } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 81a46753a3..250ee55822 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -326,8 +326,8 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc fragment_params.protocol_version = PaloInternalServiceVersion::V1; TQueryOptions query_options; TQueryGlobals query_globals; - _runtime_state.reset( - new RuntimeState(params, query_options, query_globals, ExecEnv::GetInstance())); + _runtime_state = RuntimeState::create_unique(params, query_options, query_globals, + ExecEnv::GetInstance()); DescriptorTbl* desc_tbl = nullptr; Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl); if (UNLIKELY(!status.ok())) { diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index e4d939b5a0..5d9f13c7f2 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -252,7 +252,7 @@ ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) { Status BlockChanger::change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const { ObjectPool pool; - RuntimeState* state = pool.add(new RuntimeState()); + RuntimeState* state = pool.add(RuntimeState::create_unique().release()); state->set_desc_tbl(&_desc_tbl); state->set_be_exec_version(_fe_compatible_version); RowDescriptor row_desc = diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4fcd343047..6c833133a9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -208,8 +208,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re // 1. init _runtime_state _runtime_state = - std::make_unique(local_params, request.query_id, request.query_options, - _query_ctx->query_globals, _exec_env); + RuntimeState::create_unique(local_params, request.query_id, request.query_options, + _query_ctx->query_globals, _exec_env); _runtime_state->set_query_ctx(_query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); _runtime_state->set_tracer(std::move(tracer)); diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 2beb14b266..36d63a12a1 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -138,8 +138,8 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals, TExecPlanFragmentParams fragment_params; fragment_params.params = params; fragment_params.protocol_version = PaloInternalServiceVersion::V1; - _runtime_state.reset(new RuntimeState(fragment_params.params, query_options, query_globals, - ExecEnv::GetInstance())); + _runtime_state = RuntimeState::create_unique(fragment_params.params, query_options, + query_globals, ExecEnv::GetInstance()); DescriptorTbl* desc_tbl = nullptr; Status status = DescriptorTbl::create(_runtime_state->obj_pool(), TDescriptorTable(), &desc_tbl); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index d80ce3d43c..9b2f427b5f 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -113,7 +113,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, const TQueryGlobals& query_globals = query_ctx == nullptr ? request.query_globals : query_ctx->query_globals; - _runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env)); + _runtime_state = + RuntimeState::create_unique(params, request.query_options, query_globals, _exec_env); _runtime_state->set_query_ctx(query_ctx); _runtime_state->set_query_mem_tracker(query_ctx == nullptr ? _exec_env->orphan_mem_tracker() : query_ctx->query_mem_tracker); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 903e48f3f0..f2b50bd92f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -37,6 +37,7 @@ #include "cctz/time_zone.h" // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/factory_creator.h" #include "common/status.h" #include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" @@ -53,6 +54,8 @@ class QueryContext; // A collection of items that are part of the global state of a // query and shared across all execution nodes of that query. class RuntimeState { + ENABLE_FACTORY_CREATOR(RuntimeState); + public: // for ut only RuntimeState(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 02947f0d37..3c77fcd2f3 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -49,7 +49,7 @@ Reusable::~Reusable() { Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector& output_exprs, size_t block_size) { - _runtime_state.reset(new RuntimeState()); + _runtime_state = RuntimeState::create_unique(); RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); _runtime_state->set_desc_tbl(_desc_tbl); _block_pool.resize(block_size); diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index f0d86afbb4..978eaa19c7 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -87,6 +87,7 @@ Status align_block_with_schema(const TabletSchema& schema, int64_t table_id /*fo // maybe use col_unique_id as key in the future // but for dynamic table, column name if ok struct FullBaseSchemaView { + ENABLE_FACTORY_CREATOR(FullBaseSchemaView); phmap::flat_hash_map column_name_to_column; int32_t schema_version = -1; int32_t table_id = 0; diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 3e417b8504..db63338360 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -44,7 +44,7 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), _data_size(0), _heap_size(limit + offset), - _heap(std::make_unique()), + _heap(SortingHeap::create_unique()), _topn_filter_rows(0), _init_sort_descs(false) {} diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index eba924cb9f..f036e9b360 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -44,6 +44,8 @@ class VSortExecExprs; namespace doris::vectorized { class SortingHeap { + ENABLE_FACTORY_CREATOR(SortingHeap); + public: const HeapSortCursorImpl& top() { return _queue.top(); } @@ -71,6 +73,8 @@ private: }; class HeapSorter final : public Sorter { + ENABLE_FACTORY_CREATOR(HeapSorter); + public: HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 982aafc1a8..b8bf4291ab 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -313,8 +313,7 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(std::unique_ptr( - new MergeSorterState(row_desc, offset, limit, state, profile))) {} + _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)) {} Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index a7ab99e8cf..48ea77852a 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -46,6 +46,8 @@ namespace doris::vectorized { // TODO: now we only use merge sort class MergeSorterState { + ENABLE_FACTORY_CREATOR(MergeSorterState); + public: MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t limit, RuntimeState* state, RuntimeProfile* profile) @@ -179,6 +181,8 @@ protected: }; class FullSorter final : public Sorter { + ENABLE_FACTORY_CREATOR(FullSorter); + public: FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 21c42eff0d..8a44528daf 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -44,8 +44,7 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), - _state(std::unique_ptr( - new MergeSorterState(row_desc, offset, limit, state, profile))), + _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)), _row_desc(row_desc) {} Status TopNSorter::append_block(Block* block) { diff --git a/be/src/vec/common/sort/topn_sorter.h b/be/src/vec/common/sort/topn_sorter.h index a286eedc05..4fa7399329 100644 --- a/be/src/vec/common/sort/topn_sorter.h +++ b/be/src/vec/common/sort/topn_sorter.h @@ -40,6 +40,8 @@ class VSortExecExprs; namespace doris::vectorized { class TopNSorter final : public Sorter { + ENABLE_FACTORY_CREATOR(TopNSorter); + public: TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 865e6152c2..3253dcc015 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -855,7 +855,7 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, texpr_node.__set_child_type(TPrimitiveType::INT); texpr_node.__set_num_children(2); texpr_node.__set_is_nullable(is_nullable); - root = _obj_pool->add(new VectorizedFnCall(texpr_node)); + root = _obj_pool->add(VectorizedFnCall::create_unique(texpr_node).release()); } { SlotDescriptor* slot = nullptr; @@ -866,7 +866,7 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, break; } } - VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + VExpr* slot_ref_expr = _obj_pool->add(VSlotRef::create_unique(slot).release()); root->add_child(slot_ref_expr); } { @@ -877,7 +877,7 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, int_literal.__set_value(dict_codes[0]); texpr_node.__set_int_literal(int_literal); texpr_node.__set_is_nullable(is_nullable); - VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node)); + VExpr* literal_expr = _obj_pool->add(VLiteral::create_unique(texpr_node).release()); root->add_child(literal_expr); } } else { @@ -893,7 +893,7 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, // VdirectInPredicate assume is_nullable = false. node.__set_is_nullable(false); - root = _obj_pool->add(new vectorized::VDirectInPredicate(node)); + root = _obj_pool->add(vectorized::VDirectInPredicate::create_unique(node).release()); std::shared_ptr hybrid_set( create_set(PrimitiveType::TYPE_INT, dict_codes.size())); for (int j = 0; j < dict_codes.size(); ++j) { @@ -910,11 +910,12 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, break; } } - VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + VExpr* slot_ref_expr = _obj_pool->add(VSlotRef::create_unique(slot).release()); root->add_child(slot_ref_expr); } } - VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new VExprContext(root)); + VExprContext* rewritten_conjunct_ctx = + _obj_pool->add(VExprContext::create_unique(root).release()); RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor)); RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state)); _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 41d53cd4c3..b012e7dd9a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -149,7 +149,8 @@ Status VFileScanner::_split_conjuncts(VExpr* conjunct_expr_root) { auto impl = conjunct_expr_root->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. VExpr* cur_expr = impl ? const_cast(impl) : conjunct_expr_root; - VExprContext* new_ctx = _state->obj_pool()->add(new VExprContext(cur_expr)); + VExprContext* new_ctx = + _state->obj_pool()->add(VExprContext::create_unique(cur_expr).release()); _vconjunct_ctx->clone_fn_contexts(new_ctx); RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc)); RETURN_IF_ERROR(new_ctx->open(_state)); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 704cd356ae..6ff46427ed 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -412,14 +412,14 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector& vexprs) { texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); texpr_node.__set_fn(fn); texpr_node.__set_is_nullable(last_expr->is_nullable() || vexprs[j]->is_nullable()); - VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); + VExpr* new_node = _pool->add(VcompoundPred::create_unique(texpr_node).release()); new_node->add_child(last_expr); DCHECK((vexprs[j])->get_impl() != nullptr); new_node->add_child(vexprs[j]); last_expr = new_node; _rf_vexpr_set.insert(vexprs[j]); } - auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); + auto new_vconjunct_ctx_ptr = _pool->add(VExprContext::create_unique(last_expr).release()); if (_vconjunct_ctx_ptr) { (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index fd905df958..7ec3b0c4a5 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -68,16 +68,18 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { if (_limit > 0 && _limit + _offset < HeapSorter::HEAP_SORT_THRESHOLD && (tnode.sort_node.sort_info.use_two_phase_read || tnode.sort_node.use_topn_opt || !row_desc.has_varlen_slots())) { - _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, - _nulls_first, row_desc)); + _sorter = HeapSorter::create_unique(_vsort_exec_exprs, _limit, _offset, _pool, + _is_asc_order, _nulls_first, row_desc); _reuse_mem = false; } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) { - _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, - _nulls_first, row_desc, state, _runtime_profile.get())); + _sorter = + TopNSorter::create_unique(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, + _nulls_first, row_desc, state, _runtime_profile.get()); } else { - _sorter.reset(new FullSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, - _nulls_first, row_desc, state, _runtime_profile.get())); + _sorter = + FullSorter::create_unique(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, + _nulls_first, row_desc, state, _runtime_profile.get()); } // init runtime predicate _use_topn_opt = tnode.sort_node.use_topn_opt; diff --git a/be/src/vec/exprs/lambda_function/varray_filter_function.cpp b/be/src/vec/exprs/lambda_function/varray_filter_function.cpp index 0e76522e41..59d6dcb851 100644 --- a/be/src/vec/exprs/lambda_function/varray_filter_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_filter_function.cpp @@ -48,6 +48,8 @@ class VExprContext; namespace doris::vectorized { class ArrayFilterFunction : public LambdaFunction { + ENABLE_FACTORY_CREATOR(ArrayFilterFunction); + public: ~ArrayFilterFunction() override = default; diff --git a/be/src/vec/exprs/lambda_function/varray_map_function.cpp b/be/src/vec/exprs/lambda_function/varray_map_function.cpp index 2c4fc977ae..78a6ee8ac6 100644 --- a/be/src/vec/exprs/lambda_function/varray_map_function.cpp +++ b/be/src/vec/exprs/lambda_function/varray_map_function.cpp @@ -48,6 +48,8 @@ class VExprContext; namespace doris::vectorized { class ArrayMapFunction : public LambdaFunction { + ENABLE_FACTORY_CREATOR(ArrayMapFunction); + public: ~ArrayMapFunction() override = default; diff --git a/be/src/vec/exprs/table_function/table_function_factory.cpp b/be/src/vec/exprs/table_function/table_function_factory.cpp index 8cdeafba22..f5846f867a 100644 --- a/be/src/vec/exprs/table_function/table_function_factory.cpp +++ b/be/src/vec/exprs/table_function/table_function_factory.cpp @@ -31,13 +31,15 @@ namespace doris::vectorized { template struct TableFunctionCreator { - TableFunction* operator()() { return new TableFunctionType(); } + std::unique_ptr operator()() { return TableFunctionType::create_unique(); } }; template <> struct TableFunctionCreator { ExplodeJsonArrayType type; - TableFunction* operator()() const { return new VExplodeJsonArrayTableFunction(type); } + std::unique_ptr operator()() const { + return VExplodeJsonArrayTableFunction::create_unique(type); + } }; inline auto VExplodeJsonArrayIntCreator = @@ -47,7 +49,7 @@ inline auto VExplodeJsonArrayDoubleCreator = inline auto VExplodeJsonArrayStringCreator = TableFunctionCreator {ExplodeJsonArrayType::STRING}; -const std::unordered_map> +const std::unordered_map()>> TableFunctionFactory::_function_map { {"explode_split", TableFunctionCreator()}, {"explode_numbers", TableFunctionCreator()}, @@ -76,7 +78,7 @@ Status TableFunctionFactory::get_fn(const std::string& fn_name_raw, ObjectPool* auto fn_iterator = _function_map.find(fn_name_real); if (fn_iterator != _function_map.end()) { - *fn = pool->add(fn_iterator->second()); + *fn = pool->add(fn_iterator->second().release()); if (is_outer) { (*fn)->set_outer(); } diff --git a/be/src/vec/exprs/table_function/table_function_factory.h b/be/src/vec/exprs/table_function/table_function_factory.h index 4bbc0e7939..a68a1763fc 100644 --- a/be/src/vec/exprs/table_function/table_function_factory.h +++ b/be/src/vec/exprs/table_function/table_function_factory.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -34,7 +35,8 @@ public: TableFunctionFactory() = delete; static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool, TableFunction** fn); - const static std::unordered_map> _function_map; + const static std::unordered_map()>> + _function_map; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exprs/table_function/vexplode.h b/be/src/vec/exprs/table_function/vexplode.h index 9c540a186c..26771bd2b2 100644 --- a/be/src/vec/exprs/table_function/vexplode.h +++ b/be/src/vec/exprs/table_function/vexplode.h @@ -33,6 +33,8 @@ class Block; namespace doris::vectorized { class VExplodeTableFunction : public TableFunction { + ENABLE_FACTORY_CREATOR(VExplodeTableFunction); + public: VExplodeTableFunction(); diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.h b/be/src/vec/exprs/table_function/vexplode_bitmap.h index 1b459ad6bf..0dfde605de 100644 --- a/be/src/vec/exprs/table_function/vexplode_bitmap.h +++ b/be/src/vec/exprs/table_function/vexplode_bitmap.h @@ -35,6 +35,8 @@ class Block; namespace doris::vectorized { class VExplodeBitmapTableFunction final : public TableFunction { + ENABLE_FACTORY_CREATOR(VExplodeBitmapTableFunction); + public: VExplodeBitmapTableFunction(); ~VExplodeBitmapTableFunction() override = default; diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.h b/be/src/vec/exprs/table_function/vexplode_json_array.h index c32aaed29e..2bdae9ed8e 100644 --- a/be/src/vec/exprs/table_function/vexplode_json_array.h +++ b/be/src/vec/exprs/table_function/vexplode_json_array.h @@ -102,6 +102,8 @@ struct ParsedData { }; class VExplodeJsonArrayTableFunction final : public TableFunction { + ENABLE_FACTORY_CREATOR(VExplodeJsonArrayTableFunction); + public: VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type); ~VExplodeJsonArrayTableFunction() override = default; diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.h b/be/src/vec/exprs/table_function/vexplode_numbers.h index b374e00750..f4f86e8d4d 100644 --- a/be/src/vec/exprs/table_function/vexplode_numbers.h +++ b/be/src/vec/exprs/table_function/vexplode_numbers.h @@ -37,6 +37,8 @@ class Block; namespace doris::vectorized { class VExplodeNumbersTableFunction : public TableFunction { + ENABLE_FACTORY_CREATOR(VExplodeNumbersTableFunction); + public: VExplodeNumbersTableFunction(); ~VExplodeNumbersTableFunction() override = default; diff --git a/be/src/vec/exprs/table_function/vexplode_split.h b/be/src/vec/exprs/table_function/vexplode_split.h index 901af08616..1155090bb1 100644 --- a/be/src/vec/exprs/table_function/vexplode_split.h +++ b/be/src/vec/exprs/table_function/vexplode_split.h @@ -38,6 +38,8 @@ class ColumnString; namespace doris::vectorized { class VExplodeSplitTableFunction final : public TableFunction { + ENABLE_FACTORY_CREATOR(VExplodeSplitTableFunction); + public: VExplodeSplitTableFunction(); ~VExplodeSplitTableFunction() override = default; diff --git a/be/src/vec/exprs/varray_literal.h b/be/src/vec/exprs/varray_literal.h index ff3cedc852..a84eeaec26 100644 --- a/be/src/vec/exprs/varray_literal.h +++ b/be/src/vec/exprs/varray_literal.h @@ -29,6 +29,8 @@ namespace vectorized { class VExprContext; class VArrayLiteral : public VLiteral { + ENABLE_FACTORY_CREATOR(VArrayLiteral); + public: VArrayLiteral(const TExprNode& node) : VLiteral(node, false) {} virtual ~VArrayLiteral() = default; diff --git a/be/src/vec/exprs/vbitmap_predicate.h b/be/src/vec/exprs/vbitmap_predicate.h index de1783871d..366a44ce46 100644 --- a/be/src/vec/exprs/vbitmap_predicate.h +++ b/be/src/vec/exprs/vbitmap_predicate.h @@ -42,6 +42,8 @@ namespace doris::vectorized { // used for bitmap runtime filter class VBitmapPredicate final : public VExpr { + ENABLE_FACTORY_CREATOR(VBitmapPredicate); + public: VBitmapPredicate(const TExprNode& node); @@ -60,7 +62,7 @@ public: FunctionContext::FunctionStateScope scope) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VBitmapPredicate(*this)); + return pool->add(VBitmapPredicate::create_unique(*this).release()); } const std::string& expr_name() const override; diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index 3226b4db9a..d3de55596b 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -38,6 +38,8 @@ class VExprContext; namespace doris::vectorized { class VBloomPredicate final : public VExpr { + ENABLE_FACTORY_CREATOR(VBloomPredicate); + public: VBloomPredicate(const TExprNode& node); ~VBloomPredicate() override = default; @@ -50,7 +52,7 @@ public: void close(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VBloomPredicate(*this)); + return pool->add(VBloomPredicate::create_unique(*this).release()); } const std::string& expr_name() const override; void set_filter(std::shared_ptr& filter); diff --git a/be/src/vec/exprs/vcase_expr.h b/be/src/vec/exprs/vcase_expr.h index 61199b0f47..c34e8d782e 100644 --- a/be/src/vec/exprs/vcase_expr.h +++ b/be/src/vec/exprs/vcase_expr.h @@ -39,6 +39,8 @@ class VExprContext; namespace doris::vectorized { class VCaseExpr final : public VExpr { + ENABLE_FACTORY_CREATOR(VCaseExpr); + public: VCaseExpr(const TExprNode& node); ~VCaseExpr() = default; @@ -51,7 +53,7 @@ public: virtual void close(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; virtual VExpr* clone(ObjectPool* pool) const override { - return pool->add(new VCaseExpr(*this)); + return pool->add(VCaseExpr::create_unique(*this).release()); } virtual const std::string& expr_name() const override; virtual std::string debug_string() const override; diff --git a/be/src/vec/exprs/vcast_expr.h b/be/src/vec/exprs/vcast_expr.h index 33146eecf5..f4e59d1581 100644 --- a/be/src/vec/exprs/vcast_expr.h +++ b/be/src/vec/exprs/vcast_expr.h @@ -38,6 +38,8 @@ class VExprContext; namespace doris::vectorized { class VCastExpr final : public VExpr { + ENABLE_FACTORY_CREATOR(VCastExpr); + public: VCastExpr(const TExprNode& node) : VExpr(node) {} ~VCastExpr() = default; @@ -50,7 +52,7 @@ public: virtual void close(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; virtual VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VCastExpr(*this)); + return pool->add(VCastExpr::create_unique(*this).release()); } virtual const std::string& expr_name() const override; virtual std::string debug_string() const override; diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h index b470807ab5..4ea582839d 100644 --- a/be/src/vec/exprs/vcolumn_ref.h +++ b/be/src/vec/exprs/vcolumn_ref.h @@ -24,6 +24,8 @@ namespace doris { namespace vectorized { class VColumnRef final : public VExpr { + ENABLE_FACTORY_CREATOR(VColumnRef); + public: //this is different of slotref is using slot_id find a column_id //slotref: need to find the equal id in tuple, then return column_id, the plan of FE is very important @@ -52,7 +54,7 @@ public: } VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VColumnRef(*this)); + return pool->add(VColumnRef::create_unique(*this).release()); } bool is_constant() const override { return false; } diff --git a/be/src/vec/exprs/vcompound_pred.h b/be/src/vec/exprs/vcompound_pred.h index 0068869443..9ce992d0ec 100644 --- a/be/src/vec/exprs/vcompound_pred.h +++ b/be/src/vec/exprs/vcompound_pred.h @@ -39,6 +39,8 @@ inline std::string compound_operator_to_string(TExprOpcode::type op) { } class VcompoundPred : public VectorizedFnCall { + ENABLE_FACTORY_CREATOR(VcompoundPred); + public: VcompoundPred(const TExprNode& node) : VectorizedFnCall(node) { _op = node.opcode; @@ -46,7 +48,9 @@ public: _expr_name = "VCompoundPredicate (" + _fn.name.function_name + ")"; } - VExpr* clone(ObjectPool* pool) const override { return pool->add(new VcompoundPred(*this)); } + VExpr* clone(ObjectPool* pool) const override { + return pool->add(VcompoundPred::create_unique(*this).release()); + } const std::string& expr_name() const override { return _expr_name; } diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h index b9321a5c2e..219dda17fc 100644 --- a/be/src/vec/exprs/vdirect_in_predicate.h +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -22,6 +22,8 @@ namespace doris::vectorized { class VDirectInPredicate final : public VExpr { + ENABLE_FACTORY_CREATOR(VDirectInPredicate); + public: VDirectInPredicate(const TExprNode& node) : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") {} @@ -62,7 +64,7 @@ public: } VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VDirectInPredicate(*this)); + return pool->add(VDirectInPredicate::create_unique(*this).release()); } const std::string& expr_name() const override { return _expr_name; } diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 80a1785349..6576fd1048 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -77,7 +77,7 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc) Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, AggFnEvaluator** result) { - *result = pool->add(new AggFnEvaluator(desc.nodes[0])); + *result = pool->add(AggFnEvaluator::create_unique(desc.nodes[0]).release()); auto& agg_fn_evaluator = *result; int node_idx = 0; for (int i = 0; i < desc.nodes[0].num_children; ++i) { diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 3ee25a878f..bb2b354280 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -46,6 +46,8 @@ class IColumn; class VExprContext; class AggFnEvaluator { + ENABLE_FACTORY_CREATOR(AggFnEvaluator); + public: static Status create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, AggFnEvaluator** result); diff --git a/be/src/vec/exprs/vectorized_fn_call.h b/be/src/vec/exprs/vectorized_fn_call.h index d50ae3dd0b..dc5330433e 100644 --- a/be/src/vec/exprs/vectorized_fn_call.h +++ b/be/src/vec/exprs/vectorized_fn_call.h @@ -41,6 +41,8 @@ class VExprContext; namespace doris::vectorized { class VectorizedFnCall : public VExpr { + ENABLE_FACTORY_CREATOR(VectorizedFnCall); + public: VectorizedFnCall(const TExprNode& node); Status execute(VExprContext* context, Block* block, int* result_column_id) override; @@ -49,7 +51,9 @@ public: FunctionContext::FunctionStateScope scope) override; void close(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; - VExpr* clone(ObjectPool* pool) const override { return pool->add(new VectorizedFnCall(*this)); } + VExpr* clone(ObjectPool* pool) const override { + return pool->add(VectorizedFnCall::create_unique(*this).release()); + } const std::string& expr_name() const override; std::string debug_string() const override; static std::string debug_string(const std::vector& exprs); diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 49995b89a8..ea07a3359f 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -131,8 +131,7 @@ void VExpr::close(doris::RuntimeState* state, VExprContext* context, } } -Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr_node, - VExpr** expr) { +Status VExpr::create_expr(ObjectPool* pool, const doris::TExprNode& texpr_node, VExpr** expr) { try { switch (texpr_node.node_type) { case TExprNodeType::BOOL_LITERAL: @@ -144,39 +143,39 @@ Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr case TExprNodeType::STRING_LITERAL: case TExprNodeType::JSON_LITERAL: case TExprNodeType::NULL_LITERAL: { - *expr = pool->add(new VLiteral(texpr_node)); + *expr = pool->add(VLiteral::create_unique(texpr_node).release()); break; } case TExprNodeType::ARRAY_LITERAL: { - *expr = pool->add(new VArrayLiteral(texpr_node)); + *expr = pool->add(VArrayLiteral::create_unique(texpr_node).release()); break; } case TExprNodeType::MAP_LITERAL: { - *expr = pool->add(new VMapLiteral(texpr_node)); + *expr = pool->add(VMapLiteral::create_unique(texpr_node).release()); break; } case TExprNodeType::STRUCT_LITERAL: { - *expr = pool->add(new VStructLiteral(texpr_node)); + *expr = pool->add(VStructLiteral::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::SLOT_REF: { - *expr = pool->add(new VSlotRef(texpr_node)); + *expr = pool->add(VSlotRef::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::COLUMN_REF: { - *expr = pool->add(new VColumnRef(texpr_node)); + *expr = pool->add(VColumnRef::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::COMPOUND_PRED: { - *expr = pool->add(new VcompoundPred(texpr_node)); + *expr = pool->add(VcompoundPred::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::LAMBDA_FUNCTION_EXPR: { - *expr = pool->add(new VLambdaFunctionExpr(texpr_node)); + *expr = pool->add(VLambdaFunctionExpr::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR: { - *expr = pool->add(new VLambdaFunctionCallExpr(texpr_node)); + *expr = pool->add(VLambdaFunctionCallExpr::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::ARITHMETIC_EXPR: @@ -184,34 +183,34 @@ Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr case doris::TExprNodeType::FUNCTION_CALL: case doris::TExprNodeType::COMPUTE_FUNCTION_CALL: case doris::TExprNodeType::MATCH_PRED: { - *expr = pool->add(new VectorizedFnCall(texpr_node)); + *expr = pool->add(VectorizedFnCall::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::CAST_EXPR: { - *expr = pool->add(new VCastExpr(texpr_node)); + *expr = pool->add(VCastExpr::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::IN_PRED: { - *expr = pool->add(new VInPredicate(texpr_node)); + *expr = pool->add(VInPredicate::create_unique(texpr_node).release()); break; } case doris::TExprNodeType::CASE_EXPR: { if (!texpr_node.__isset.case_expr) { return Status::InternalError("Case expression not set in thrift node"); } - *expr = pool->add(new VCaseExpr(texpr_node)); + *expr = pool->add(VCaseExpr::create_unique(texpr_node).release()); break; } case TExprNodeType::INFO_FUNC: { - *expr = pool->add(new VInfoFunc(texpr_node)); + *expr = pool->add(VInfoFunc::create_unique(texpr_node).release()); break; } case TExprNodeType::TUPLE_IS_NULL_PRED: { - *expr = pool->add(new VTupleIsNullPredicate(texpr_node)); + *expr = pool->add(VTupleIsNullPredicate::create_unique(texpr_node).release()); break; } case TExprNodeType::SCHEMA_CHANGE_EXPR: { - *expr = pool->add(new VSchemaChangeExpr(texpr_node)); + *expr = pool->add(VSchemaChangeExpr::create_unique(texpr_node).release()); break; } default: @@ -243,7 +242,7 @@ Status VExpr::create_tree_from_thrift(doris::ObjectPool* pool, DCHECK(root_expr != nullptr); DCHECK(ctx != nullptr); *root_expr = root; - *ctx = pool->add(new VExprContext(root)); + *ctx = pool->add(VExprContext::create_unique(root).release()); // short path for leaf node if (root_children <= 0) { return Status::OK(); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 168975d449..f4b684499f 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -29,6 +29,7 @@ #include #include +#include "common/factory_creator.h" #include "common/status.h" #include "runtime/define_primitive_type.h" #include "runtime/types.h" @@ -59,6 +60,9 @@ class VExprContext; RETURN_IF_ERROR(stmt); \ } +// VExpr should be used as shared pointer because it will be passed between classes +// like runtime filter to scan node, or from scannode to scanner. We could not make sure +// the relatioinship between threads and classes. class VExpr { public: // resize inserted param column to make sure column size equal to block.rows() @@ -242,5 +246,8 @@ protected: bool _prepared; }; +using VExprSPtr = std::shared_ptr; +using VExprUPtr = std::unique_ptr; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 73cd294e2d..a22b61e33a 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -92,7 +92,7 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) { DCHECK(_opened); DCHECK(*new_ctx == nullptr); - *new_ctx = state->obj_pool()->add(new VExprContext(_root)); + *new_ctx = state->obj_pool()->add(VExprContext::create_unique(_root).release()); for (auto& _fn_context : _fn_contexts) { (*new_ctx)->_fn_contexts.push_back(_fn_context->clone()); } diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index d9b61b7732..0ebc2ae422 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -22,6 +22,7 @@ #include #include +#include "common/factory_creator.h" #include "common/status.h" #include "runtime/types.h" #include "udf/udf.h" @@ -36,6 +37,8 @@ namespace doris::vectorized { class VExpr; class VExprContext { + ENABLE_FACTORY_CREATOR(VExprContext); + public: VExprContext(VExpr* expr); ~VExprContext(); diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h index 2826308158..925f7b4ce0 100644 --- a/be/src/vec/exprs/vin_predicate.h +++ b/be/src/vec/exprs/vin_predicate.h @@ -37,6 +37,8 @@ class VExprContext; namespace doris::vectorized { class VInPredicate final : public VExpr { + ENABLE_FACTORY_CREATOR(VInPredicate); + public: VInPredicate(const TExprNode& node); ~VInPredicate() override = default; @@ -49,7 +51,7 @@ public: void close(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VInPredicate(*this)); + return pool->add(VInPredicate::create_unique(*this).release()); } const std::string& expr_name() const override; diff --git a/be/src/vec/exprs/vinfo_func.h b/be/src/vec/exprs/vinfo_func.h index 3cbf13ea0f..6b2d9e90b9 100644 --- a/be/src/vec/exprs/vinfo_func.h +++ b/be/src/vec/exprs/vinfo_func.h @@ -32,12 +32,14 @@ class Block; class VExprContext; class VInfoFunc : public VExpr { + ENABLE_FACTORY_CREATOR(VInfoFunc); + public: VInfoFunc(const TExprNode& node); virtual ~VInfoFunc() {} virtual VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VInfoFunc(*this)); + return pool->add(VInfoFunc::create_unique(*this).release()); } virtual const std::string& expr_name() const override { return _expr_name; } virtual Status execute(VExprContext* context, vectorized::Block* block, diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h b/be/src/vec/exprs/vlambda_function_call_expr.h index f2957ee29e..302b2e8827 100644 --- a/be/src/vec/exprs/vlambda_function_call_expr.h +++ b/be/src/vec/exprs/vlambda_function_call_expr.h @@ -28,12 +28,14 @@ namespace doris::vectorized { class VLambdaFunctionCallExpr : public VExpr { + ENABLE_FACTORY_CREATOR(VLambdaFunctionCallExpr); + public: VLambdaFunctionCallExpr(const TExprNode& node) : VExpr(node) {} ~VLambdaFunctionCallExpr() override = default; VExpr* clone(ObjectPool* pool) const override { - return pool->add(new VLambdaFunctionCallExpr(*this)); + return pool->add(VLambdaFunctionCallExpr::create_unique(*this).release()); } doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, diff --git a/be/src/vec/exprs/vlambda_function_expr.h b/be/src/vec/exprs/vlambda_function_expr.h index af561a2213..490d17d01b 100644 --- a/be/src/vec/exprs/vlambda_function_expr.h +++ b/be/src/vec/exprs/vlambda_function_expr.h @@ -22,6 +22,8 @@ namespace doris::vectorized { class VLambdaFunctionExpr final : public VExpr { + ENABLE_FACTORY_CREATOR(VLambdaFunctionExpr); + public: VLambdaFunctionExpr(const TExprNode& node) : VExpr(node) {} ~VLambdaFunctionExpr() override = default; @@ -32,7 +34,7 @@ public: } VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VLambdaFunctionExpr(*this)); + return pool->add(VLambdaFunctionExpr::create_unique(*this).release()); } const std::string& expr_name() const override { return _expr_name; } diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h index 456375487a..733b1df8d2 100644 --- a/be/src/vec/exprs/vliteral.h +++ b/be/src/vec/exprs/vliteral.h @@ -33,6 +33,8 @@ class Block; class VExprContext; class VLiteral : public VExpr { + ENABLE_FACTORY_CREATOR(VLiteral); + public: VLiteral(const TExprNode& node, bool should_init = true) : VExpr(node), _expr_name(_data_type->get_name()) { @@ -42,7 +44,9 @@ public: } Status execute(VExprContext* context, vectorized::Block* block, int* result_column_id) override; const std::string& expr_name() const override { return _expr_name; } - VExpr* clone(doris::ObjectPool* pool) const override { return pool->add(new VLiteral(*this)); } + VExpr* clone(doris::ObjectPool* pool) const override { + return pool->add(VLiteral::create_unique(*this).release()); + } std::string debug_string() const override; std::string value() const; diff --git a/be/src/vec/exprs/vmap_literal.h b/be/src/vec/exprs/vmap_literal.h index 173ff0dcd3..c107fe3e20 100644 --- a/be/src/vec/exprs/vmap_literal.h +++ b/be/src/vec/exprs/vmap_literal.h @@ -28,6 +28,8 @@ namespace vectorized { class VExprContext; class VMapLiteral : public VLiteral { + ENABLE_FACTORY_CREATOR(VMapLiteral); + public: VMapLiteral(const TExprNode& node) : VLiteral(node, false) {} ~VMapLiteral() override = default; diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index b2a03db8ba..7e31513824 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -41,6 +41,8 @@ class VExprContext; namespace doris::vectorized { class VRuntimeFilterWrapper final : public VExpr { + ENABLE_FACTORY_CREATOR(VRuntimeFilterWrapper); + public: VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl); VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr); @@ -56,7 +58,7 @@ public: void close(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VRuntimeFilterWrapper(*this)); + return pool->add(VRuntimeFilterWrapper::create_unique(*this).release()); } const std::string& expr_name() const override; const std::vector& children() const override { return _impl->children(); } diff --git a/be/src/vec/exprs/vschema_change_expr.cpp b/be/src/vec/exprs/vschema_change_expr.cpp index 0e0e71e167..c8da5fccf3 100644 --- a/be/src/vec/exprs/vschema_change_expr.cpp +++ b/be/src/vec/exprs/vschema_change_expr.cpp @@ -84,8 +84,8 @@ Status VSchemaChangeExpr::execute(VExprContext* context, doris::vectorized::Bloc ColumnObject& object_column = *assert_cast( block->get_by_position(_column_id).column->assume_mutable().get()); CHECK(object_column.is_finalized()); - std::unique_ptr full_base_schema_view; - full_base_schema_view.reset(new vectorized::schema_util::FullBaseSchemaView); + std::unique_ptr full_base_schema_view = + vectorized::schema_util::FullBaseSchemaView::create_unique(); full_base_schema_view->table_id = _table_id; vectorized::ColumnsWithTypeAndName cols_with_type_name; cols_with_type_name.reserve(object_column.get_subcolumns().size()); diff --git a/be/src/vec/exprs/vschema_change_expr.h b/be/src/vec/exprs/vschema_change_expr.h index afa35f6b4c..fb0dae796c 100644 --- a/be/src/vec/exprs/vschema_change_expr.h +++ b/be/src/vec/exprs/vschema_change_expr.h @@ -43,6 +43,8 @@ namespace doris::vectorized { // from it's type and name.It contains an inner slot which indicated it's variant // column. class VSchemaChangeExpr : public VExpr { + ENABLE_FACTORY_CREATOR(VSchemaChangeExpr); + public: VSchemaChangeExpr(const TExprNode& node) : VExpr(node), _tnode(node) {} ~VSchemaChangeExpr() = default; @@ -55,7 +57,7 @@ public: void close(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VSchemaChangeExpr(*this)); + return pool->add(VSchemaChangeExpr::create_unique(*this).release()); } const std::string& expr_name() const override; std::string debug_string() const override; diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index 658e0f05ac..6db7238597 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -33,6 +33,8 @@ class Block; class VExprContext; class VSlotRef final : public VExpr { + ENABLE_FACTORY_CREATOR(VSlotRef); + public: VSlotRef(const doris::TExprNode& node); VSlotRef(const SlotDescriptor* desc); @@ -41,7 +43,7 @@ public: virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, VExprContext* context) override; virtual VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VSlotRef(*this)); + return pool->add(VSlotRef::create_unique(*this).release()); } virtual const std::string& expr_name() const override; diff --git a/be/src/vec/exprs/vstruct_literal.h b/be/src/vec/exprs/vstruct_literal.h index 7af79946b7..8cd9683774 100644 --- a/be/src/vec/exprs/vstruct_literal.h +++ b/be/src/vec/exprs/vstruct_literal.h @@ -29,6 +29,8 @@ namespace vectorized { class VExprContext; class VStructLiteral : public VLiteral { + ENABLE_FACTORY_CREATOR(VStructLiteral); + public: VStructLiteral(const TExprNode& node) : VLiteral(node, false) {} ~VStructLiteral() override = default; diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.h b/be/src/vec/exprs/vtuple_is_null_predicate.h index 91d217a756..d927165ef6 100644 --- a/be/src/vec/exprs/vtuple_is_null_predicate.h +++ b/be/src/vec/exprs/vtuple_is_null_predicate.h @@ -37,6 +37,8 @@ class VExprContext; namespace doris::vectorized { class VTupleIsNullPredicate final : public VExpr { + ENABLE_FACTORY_CREATOR(VTupleIsNullPredicate); + public: explicit VTupleIsNullPredicate(const TExprNode& node); ~VTupleIsNullPredicate() override = default; @@ -46,7 +48,7 @@ public: VExprContext* context) override; VExpr* clone(doris::ObjectPool* pool) const override { - return pool->add(new VTupleIsNullPredicate(*this)); + return pool->add(VTupleIsNullPredicate::create_unique(*this).release()); } [[nodiscard]] bool is_constant() const override { return false; } diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index 97ab412160..91e7de7afa 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -130,9 +130,8 @@ ThriftStruct from_json_string(const std::string& json_val) { using namespace apache::thrift::transport; using namespace apache::thrift::protocol; ThriftStruct ts; - TMemoryBuffer* buffer = - new TMemoryBuffer((uint8_t*)json_val.c_str(), (uint32_t)json_val.size()); - std::shared_ptr trans(buffer); + std::shared_ptr trans = + std::make_shared((uint8_t*)json_val.c_str(), (uint32_t)json_val.size()); TJSONProtocol protocol(trans); ts.read(&protocol); return ts; diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp index 5f59ab0d7f..b8cce3fbf7 100644 --- a/be/test/exprs/runtime_filter_test.cpp +++ b/be/test/exprs/runtime_filter_test.cpp @@ -38,8 +38,8 @@ public: virtual void SetUp() { ExecEnv* exec_env = ExecEnv::GetInstance(); exec_env = nullptr; - _runtime_stat.reset( - new RuntimeState(_fragment_id, _query_options, _query_globals, exec_env)); + _runtime_stat = + RuntimeState::create_unique(_fragment_id, _query_options, _query_globals, exec_env); _runtime_stat->init_mem_trackers(); } virtual void TearDown() { _obj_pool.clear(); } diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc index 90cba77566..785bdbcc99 100644 --- a/be/test/runtime/test_env.cc +++ b/be/test/runtime/test_env.cc @@ -45,13 +45,6 @@ TestEnv::~TestEnv() { SAFE_DELETE(_engine); } -RuntimeState* TestEnv::create_runtime_state(int64_t query_id) { - TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); - plan_params.params.query_id.hi = 0; - plan_params.params.query_id.lo = query_id; - return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env); -} - void TestEnv::tear_down_query_states() { _query_states.clear(); } diff --git a/be/test/runtime/test_env.h b/be/test/runtime/test_env.h index 06993c899e..be168459cb 100644 --- a/be/test/runtime/test_env.h +++ b/be/test/runtime/test_env.h @@ -44,9 +44,6 @@ public: ExecEnv* exec_env() { return _exec_env; } private: - // Create a new RuntimeState sharing global environment. - RuntimeState* create_runtime_state(int64_t query_id); - ExecEnv* _exec_env; // Per-query states with associated block managers. diff --git a/be/test/testutil/function_utils.cpp b/be/test/testutil/function_utils.cpp index 7017c38003..a58d3b826a 100644 --- a/be/test/testutil/function_utils.cpp +++ b/be/test/testutil/function_utils.cpp @@ -29,7 +29,7 @@ FunctionUtils::FunctionUtils() { globals.__set_now_string("2019-08-06 01:38:57"); globals.__set_timestamp_ms(1565026737805); globals.__set_time_zone("Asia/Shanghai"); - _state = new RuntimeState(globals); + _state = RuntimeState::create_unique(globals).release(); doris::TypeDescriptor return_type; std::vector arg_types; _fn_ctx = FunctionContext::create_context(_state, return_type, arg_types); @@ -42,7 +42,7 @@ FunctionUtils::FunctionUtils(const doris::TypeDescriptor& return_type, globals.__set_now_string("2019-08-06 01:38:57"); globals.__set_timestamp_ms(1565026737805); globals.__set_time_zone("Asia/Shanghai"); - _state = new RuntimeState(globals); + _state = RuntimeState::create_unique(globals).release(); _fn_ctx = FunctionContext::create_context(_state, return_type, arg_types); }