diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1787920ae3..970aec83ee 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -264,6 +264,11 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { return Status::OK(); } +Status RuntimeState::init_instance_mem_tracker() { + _instance_mem_tracker.reset(new MemTracker(-1)); + return Status::OK(); +} + Status RuntimeState::init_buffer_poolstate() { ExecEnv* exec_env = ExecEnv::GetInstance(); int64_t mem_limit = _query_mem_tracker->lowest_limit(); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e771c86c96..41c399d46e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -93,6 +93,9 @@ public: // This function also initializes a user function mem tracker (in the fourth level). Status init_mem_trackers(const TUniqueId& query_id); + // for ut only + Status init_instance_mem_tracker(); + /// Called from Init() to set up buffer reservations and the file group. Status init_buffer_poolstate(); diff --git a/be/test/runtime/memory_scratch_sink_test.cpp b/be/test/runtime/memory_scratch_sink_test.cpp index 00b6bf4e62..f966530f44 100644 --- a/be/test/runtime/memory_scratch_sink_test.cpp +++ b/be/test/runtime/memory_scratch_sink_test.cpp @@ -37,17 +37,14 @@ #include "runtime/tuple_row.h" #include "util/blocking_queue.hpp" #include "util/logging.h" +#include "testutil/desc_tbl_builder.h" namespace doris { class MemoryScratchSinkTest : public testing::Test { public: MemoryScratchSinkTest() { - // _exec_env = doris::ExecEnv::GetInstance(); - // std::vector paths; - // doris::parse_conf_store_paths(doris::config::storage_root_path, &paths); - // doris::ExecEnv::init(_exec_env, paths); - // all below is just only for test MemoryScratchSink + // all below is just only for test MemoryScratchSink ResultQueueMgr* result_queue_mgr = new ResultQueueMgr(); ThreadResourceMgr* thread_mgr = new ThreadResourceMgr(); _exec_env._result_queue_mgr = result_queue_mgr; @@ -58,6 +55,7 @@ public: query_id.lo = 10; query_id.hi = 100; _runtime_state = new RuntimeState(query_id, query_options, TQueryGlobals(), &_exec_env); + _runtime_state->init_instance_mem_tracker(); { TExpr expr; { @@ -75,6 +73,12 @@ public: } _mem_tracker = new MemTracker(-1, "MemoryScratchSinkTest", _runtime_state->instance_mem_tracker()); + // init _row_desc + vector nullable_tuples(1, false); + vector tuple_ids(1, static_cast(0)); + DescriptorTblBuilder int_builder(&_pool); + int_builder.declare_tuple() << TYPE_INT; + _row_desc = _pool.add(new RowDescriptor(*int_builder.build(), tuple_ids, nullable_tuples)); } virtual ~MemoryScratchSinkTest() { delete _runtime_state; @@ -86,22 +90,22 @@ protected: } private: + ObjectPool _pool; ExecEnv _exec_env; std::vector _exprs; RuntimeState* _runtime_state; - RowDescriptor _row_desc; + RowDescriptor* _row_desc; TMemoryScratchSink _tsink; MemTracker *_mem_tracker; }; TEST_F(MemoryScratchSinkTest, work_flow_normal) { - MemoryScratchSink sink(_row_desc, _exprs, _tsink); + MemoryScratchSink sink(*_row_desc, _exprs, _tsink); TDataSink data_sink; data_sink.memory_scratch_sink = _tsink; ASSERT_TRUE(sink.init(data_sink).ok()); ASSERT_TRUE(sink.prepare(_runtime_state).ok()); - ASSERT_TRUE(sink.prepare(_runtime_state).ok()); - RowBatch row_batch(_row_desc, 1024, _mem_tracker); + RowBatch row_batch(*_row_desc, 1024, _mem_tracker); row_batch.add_row(); row_batch.commit_last_row(); ASSERT_TRUE(sink.send(_runtime_state, &row_batch).ok());