diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index f90d293435..3ae2b6d44c 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -519,13 +519,15 @@ add_subdirectory(${SRC_DIR}/plugin) # Utility CMake function to make specifying tests and benchmarks less verbose FUNCTION(ADD_BE_TEST TEST_NAME) + # use argn to add additional files + set(ADDITIONAL_FILES ${ARGN}) set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/") # This gets the directory where the test is from (e.g. 'exprs' or 'runtime') get_filename_component(DIR_NAME ${CMAKE_CURRENT_SOURCE_DIR} NAME) get_filename_component(TEST_DIR_NAME ${TEST_NAME} PATH) get_filename_component(TEST_FILE_NAME ${TEST_NAME} NAME) - ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp) + ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES}) TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS}) SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control") if (NOT "${TEST_DIR_NAME}" STREQUAL "") diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0147443217..0385650498 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -140,6 +140,10 @@ StorageEngine::~StorageEngine() { DEREGISTER_HOOK_METRIC(unused_rowsets_count); DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption); _clear(); + + if(_compaction_thread_pool){ + _compaction_thread_pool->shutdown(); + } } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index db32d1ae2c..7bd7f541ff 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -67,7 +67,6 @@ set(RUNTIME_FILES disk_io_mgr_reader_context.cc disk_io_mgr_scan_range.cc buffered_block_mgr2.cc - test_env.cc mem_tracker.cpp spill_sorter.cc sorted_run_merger.cc diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index 793e496a96..af1cde7a62 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -404,9 +404,8 @@ Status DiskIoMgr::init(const std::shared_ptr& process_mem_tracker) { return Status::OK(); } -Status DiskIoMgr::register_context(RequestContext** request_context, - std::shared_ptr mem_tracker) { - DCHECK(_request_context_cache.get() != NULL) << "Must call init() first."; +Status DiskIoMgr::register_context(RequestContext** request_context, std::shared_ptr mem_tracker) { + DCHECK(_request_context_cache) << "Must call init() first."; *request_context = _request_context_cache->get_new_context(); (*request_context)->reset(std::move(mem_tracker)); return Status::OK(); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 9a73058a71..cbcf3d77ed 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -361,8 +361,6 @@ public: int64_t get_load_mem_limit(); private: - // Allow TestEnv to set block_mgr manually for testing. - friend class TestEnv; // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const boost::shared_ptr& block_mgr) { diff --git a/be/src/runtime/test_env.cc b/be/src/runtime/test_env.cc deleted file mode 100644 index ef50dcfe94..0000000000 --- a/be/src/runtime/test_env.cc +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/test_env.h" - -#include "util/disk_info.h" -#include "util/doris_metrics.h" - -using boost::shared_ptr; - -namespace doris { - -TestEnv::TestEnv() { - _exec_env.reset(new ExecEnv()); - // _exec_env->init_for_tests(); - _io_mgr_tracker.reset(new MemTracker(-1)); - _block_mgr_parent_tracker.reset(new MemTracker(-1)); - _exec_env->disk_io_mgr()->init(_io_mgr_tracker); - _tmp_file_mgr.reset(new TmpFileMgr()); - _tmp_file_mgr->init(); -} - -void TestEnv::init_tmp_file_mgr(const std::vector& tmp_dirs, bool one_dir_per_device) { - // Need to recreate metrics to avoid error when registering metric twice. - _tmp_file_mgr.reset(new TmpFileMgr()); - _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device); -} - -TestEnv::~TestEnv() { - // Queries must be torn down first since they are dependent on global state. - tear_down_query_states(); - _block_mgr_parent_tracker.reset(); - _exec_env.reset(); - _io_mgr_tracker.reset(); - _tmp_file_mgr.reset(); -} - -RuntimeState* TestEnv::create_runtime_state(int64_t query_id) { - TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); - plan_params.params.query_id.hi = 0; - plan_params.params.query_id.lo = query_id; - return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env.get()); -} - -Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int block_size, - RuntimeState** runtime_state) { - *runtime_state = create_runtime_state(query_id); - if (*runtime_state == NULL) { - return Status::InternalError("Unexpected error creating RuntimeState"); - } - - shared_ptr mgr; - RETURN_IF_ERROR(BufferedBlockMgr2::create( - *runtime_state, _block_mgr_parent_tracker, (*runtime_state)->runtime_profile(), - _tmp_file_mgr.get(), calculate_mem_tracker(max_buffers, block_size), block_size, &mgr)); - (*runtime_state)->set_block_mgr2(mgr); - // (*runtime_state)->_block_mgr = mgr; - - _query_states.push_back(shared_ptr(*runtime_state)); - return Status::OK(); -} - -Status TestEnv::create_query_states(int64_t start_query_id, int num_mgrs, int buffers_per_mgr, - int block_size, std::vector* runtime_states) { - for (int i = 0; i < num_mgrs; ++i) { - RuntimeState* runtime_state = NULL; - RETURN_IF_ERROR(create_query_state(start_query_id + i, buffers_per_mgr, block_size, - &runtime_state)); - runtime_states->push_back(runtime_state); - } - return Status::OK(); -} - -void TestEnv::tear_down_query_states() { - _query_states.clear(); -} - -int64_t TestEnv::calculate_mem_tracker(int max_buffers, int block_size) { - DCHECK_GE(max_buffers, -1); - if (max_buffers == -1) { - return -1; - } - return max_buffers * static_cast(block_size); -} - -} // end namespace doris diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h index 04277a5b88..58ba7ae1d0 100644 --- a/be/src/runtime/tmp_file_mgr.h +++ b/be/src/runtime/tmp_file_mgr.h @@ -170,7 +170,7 @@ private: bool is_blacklisted(DeviceId device_id); ExecEnv* _exec_env; - bool _initialized; + bool _initialized = false; // Protects the status of tmp dirs (i.e. whether they're blacklisted). SpinLock _dir_status_lock; diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index c3bab67a23..0d0e33c427 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -57,7 +57,7 @@ ADD_BE_TEST(small_file_mgr_test) ADD_BE_TEST(heartbeat_flags_test) ADD_BE_TEST(result_queue_mgr_test) -ADD_BE_TEST(memory_scratch_sink_test) +ADD_BE_TEST(memory_scratch_sink_test test_env.cc) ADD_BE_TEST(external_scan_context_mgr_test) ADD_BE_TEST(memory/chunk_allocator_test) diff --git a/be/test/runtime/memory_scratch_sink_test.cpp b/be/test/runtime/memory_scratch_sink_test.cpp index b1d5a1feb7..a0d79a023f 100644 --- a/be/test/runtime/memory_scratch_sink_test.cpp +++ b/be/test/runtime/memory_scratch_sink_test.cpp @@ -33,14 +33,11 @@ #include "gen_cpp/Types_types.h" #include "olap/options.h" #include "olap/row.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/primitive_type.h" -#include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "runtime/thread_resource_mgr.h" +#include "runtime/test_env.h" #include "runtime/tuple_row.h" #include "testutil/desc_tbl_builder.h" #include "util/blocking_queue.hpp" @@ -51,6 +48,7 @@ namespace doris { class MemoryScratchSinkTest : public testing::Test { public: MemoryScratchSinkTest() { + _env = std::make_shared(); { TExpr expr; { @@ -67,12 +65,7 @@ public: } } - ~MemoryScratchSinkTest() { - delete _state; - delete _exec_env->_result_queue_mgr; - delete _exec_env->_thread_mgr; - delete _exec_env->_buffer_reservation; - } + ~MemoryScratchSinkTest() { delete _state; } virtual void SetUp() { config::periodic_counter_update_period_ms = 500; @@ -96,7 +89,7 @@ public: private: ObjectPool _obj_pool; - ExecEnv* _exec_env = nullptr; + std::shared_ptr _env; // std::vector _exprs; TDescriptorTable _t_desc_table; RuntimeState* _state = nullptr; @@ -109,21 +102,17 @@ private: }; void MemoryScratchSinkTest::init() { - _exec_env = ExecEnv::GetInstance(); init_desc_tbl(); init_runtime_state(); } void MemoryScratchSinkTest::init_runtime_state() { - _exec_env->_result_queue_mgr = new ResultQueueMgr(); - _exec_env->_thread_mgr = new ThreadResourceMgr(); - _exec_env->_buffer_reservation = new ReservationTracker(); TQueryOptions query_options; query_options.batch_size = 1024; TUniqueId query_id; query_id.lo = 10; query_id.hi = 100; - _state = new RuntimeState(query_id, query_options, TQueryGlobals(), _exec_env); + _state = new RuntimeState(query_id, query_options, TQueryGlobals(), _env->exec_env()); _state->init_instance_mem_tracker(); _mem_tracker = MemTracker::CreateTracker(-1, "MemoryScratchSinkTest", _state->instance_mem_tracker()); diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc new file mode 100644 index 0000000000..c0594365ad --- /dev/null +++ b/be/test/runtime/test_env.cc @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/test_env.h" + +#include + +#include + +#include "olap/storage_engine.h" +#include "runtime/fragment_mgr.h" +#include "runtime/initial_reservations.h" +#include "runtime/result_queue_mgr.h" +#include "util/disk_info.h" +#include "util/priority_thread_pool.hpp" + +namespace doris { + +TestEnv::TestEnv() + : _block_mgr_parent_tracker(MemTracker::CreateTracker(-1, "block mgr parent")), + _io_mgr_tracker(MemTracker::CreateTracker(-1, "io mgr")) { + // Some code will use ExecEnv::GetInstance(), so init the global ExecEnv singleton + _exec_env = ExecEnv::GetInstance(); + _exec_env->_thread_mgr = new ThreadResourceMgr(2); + _exec_env->_buffer_reservation = new ReservationTracker(); + _exec_env->_mem_tracker = MemTracker::CreateTracker(-1, "TestEnv root"); + _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10); + _exec_env->disk_io_mgr()->init(_io_mgr_tracker); + _exec_env->_thread_pool = new PriorityThreadPool(1, 16); + _exec_env->_result_queue_mgr = new ResultQueueMgr(); + // TODO may need rpc support, etc. +} + +void TestEnv::init_tmp_file_mgr(const std::vector& tmp_dirs, bool one_dir_per_device) { + _tmp_file_mgr = std::make_shared(); + _exec_env->_tmp_file_mgr = _tmp_file_mgr.get(); + + DiskInfo::init(); + // will use DiskInfo::num_disks(), DiskInfo should be initialized before + auto st = _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device); + DCHECK(st.ok()) << st.get_error_msg(); +} + +void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit) { + _exec_env->_buffer_pool = new BufferPool(min_page_len, capacity, clean_pages_limit); +} + +TestEnv::~TestEnv() { + SAFE_DELETE(_exec_env->_result_queue_mgr); + SAFE_DELETE(_exec_env->_buffer_pool); + SAFE_DELETE(_exec_env->_thread_pool); + SAFE_DELETE(_exec_env->_disk_io_mgr); + SAFE_DELETE(_exec_env->_buffer_reservation); + SAFE_DELETE(_exec_env->_thread_mgr); + + if (_engine == StorageEngine::_s_instance) { + // the engine instance is created by this test env + StorageEngine::_s_instance = nullptr; + } + SAFE_DELETE(_engine); +} + +RuntimeState* TestEnv::create_runtime_state(int64_t query_id) { + TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); + plan_params.params.query_id.hi = 0; + plan_params.params.query_id.lo = query_id; + return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env); +} + +Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int block_size, + RuntimeState** runtime_state) { + *runtime_state = create_runtime_state(query_id); + if (*runtime_state == nullptr) { + return Status::InternalError("Unexpected error creating RuntimeState"); + } + + boost::shared_ptr mgr; + RETURN_IF_ERROR(BufferedBlockMgr2::create( + *runtime_state, _block_mgr_parent_tracker, (*runtime_state)->runtime_profile(), + _tmp_file_mgr.get(), calculate_mem_tracker(max_buffers, block_size), block_size, &mgr)); + (*runtime_state)->set_block_mgr2(mgr); + // (*runtime_state)->_block_mgr = mgr; + + _query_states.push_back(std::shared_ptr(*runtime_state)); + return Status::OK(); +} + +Status TestEnv::create_query_states(int64_t start_query_id, int num_mgrs, int buffers_per_mgr, + int block_size, std::vector* runtime_states) { + for (int i = 0; i < num_mgrs; ++i) { + RuntimeState* runtime_state = nullptr; + RETURN_IF_ERROR(create_query_state(start_query_id + i, buffers_per_mgr, block_size, + &runtime_state)); + runtime_states->push_back(runtime_state); + } + return Status::OK(); +} + +void TestEnv::tear_down_query_states() { + _query_states.clear(); +} + +int64_t TestEnv::calculate_mem_tracker(int max_buffers, int block_size) { + DCHECK_GE(max_buffers, -1); + if (max_buffers == -1) { + return -1; + } + return max_buffers * static_cast(block_size); +} + +void TestEnv::init_storage_engine(bool need_open, const std::vector& paths) { + if (StorageEngine::_s_instance) { + LOG(INFO) << "Engine instance already exists"; + return; + } + // init and open storage engine + doris::EngineOptions options; + for (const auto& path : paths) { + options.store_paths.emplace_back(path, -1); + } + options.backend_uid = UniqueId::gen_uid(); + config::tablet_map_shard_size = 1; + config::txn_map_shard_size = 1; + config::txn_shard_size = 1; + + // This engine will be the singleton instance, cuz StorageEngine::_s_instance is nullptr now. + Status st; + if (need_open) { + st = StorageEngine::open(options, &_engine); + } else { + _engine = new StorageEngine(options); + } + DCHECK(st.ok()) << st.get_error_msg(); + _exec_env->set_storage_engine(_engine); +} + +} // end namespace doris diff --git a/be/src/runtime/test_env.h b/be/test/runtime/test_env.h similarity index 78% rename from be/src/runtime/test_env.h rename to be/test/runtime/test_env.h index 25917bc543..5212efc4da 100644 --- a/be/src/runtime/test_env.h +++ b/be/test/runtime/test_env.h @@ -25,8 +25,9 @@ namespace doris { -// Helper testing class that creates an environment with a buffered-block-mgr similar -// to the one Impala's runtime is using. +/// Helper testing class that creates an environment with runtime memory management +/// similar to the one used by the Doris runtime. Only one TestEnv can be active at a +/// time, because it modifies the global ExecEnv singleton. class TestEnv { public: TestEnv(); @@ -36,6 +37,11 @@ public: // query states have been created. void init_tmp_file_mgr(const std::vector& tmp_dirs, bool one_dir_per_device); + void init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); + + // If don't need to open, paths can be empty. + void init_storage_engine(bool need_open, const std::vector& paths = {}); + // Create a RuntimeState for a query with a new block manager. The RuntimeState is // owned by the TestEnv. Status create_query_state(int64_t query_id, int max_buffers, int block_size, @@ -51,28 +57,26 @@ public: // Calculate memory limit accounting for overflow and negative values. // If max_buffers is -1, no memory limit will apply. - int64_t calculate_mem_tracker(int max_buffers, int block_size); + static int64_t calculate_mem_tracker(int max_buffers, int block_size); - ExecEnv* exec_env() { return _exec_env.get(); } + ExecEnv* exec_env() { return _exec_env; } std::shared_ptr block_mgr_parent_tracker() { return _block_mgr_parent_tracker; } MemTracker* io_mgr_tracker() { return _io_mgr_tracker.get(); } TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr.get(); } private: - // Recreate global metric groups. - void init_metrics(); - // Create a new RuntimeState sharing global environment. RuntimeState* create_runtime_state(int64_t query_id); - // Global state for test environment. - boost::scoped_ptr _exec_env; + ExecEnv* _exec_env; std::shared_ptr _block_mgr_parent_tracker; std::shared_ptr _io_mgr_tracker; - boost::scoped_ptr _tmp_file_mgr; + std::shared_ptr _tmp_file_mgr; // Per-query states with associated block managers. - std::vector> _query_states; + std::vector > _query_states; + + StorageEngine* _engine = nullptr; }; } // end namespace doris