// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "runtime/test_env.h" #include #include #include "olap/storage_engine.h" #include "runtime/fragment_mgr.h" #include "runtime/initial_reservations.h" #include "runtime/memory/mem_tracker_task_pool.h" #include "runtime/result_queue_mgr.h" #include "util/disk_info.h" #include "util/priority_thread_pool.hpp" namespace doris { TestEnv::TestEnv() { // Some code will use ExecEnv::GetInstance(), so init the global ExecEnv singleton _exec_env = ExecEnv::GetInstance(); _exec_env->_thread_mgr = new ThreadResourceMgr(2); _exec_env->_buffer_reservation = new ReservationTracker(); _exec_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10); _exec_env->disk_io_mgr()->init(-1); _exec_env->_scan_thread_pool = new PriorityThreadPool(1, 16); _exec_env->_result_queue_mgr = new ResultQueueMgr(); // TODO may need rpc support, etc. } void TestEnv::init_tmp_file_mgr(const std::vector& tmp_dirs, bool one_dir_per_device) { _tmp_file_mgr = std::make_shared(); _exec_env->_tmp_file_mgr = _tmp_file_mgr.get(); DiskInfo::init(); // will use DiskInfo::num_disks(), DiskInfo should be initialized before auto st = _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device); DCHECK(st.ok()) << st.get_error_msg(); } void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit) { _exec_env->_buffer_pool = new BufferPool(min_page_len, capacity, clean_pages_limit); } TestEnv::~TestEnv() { SAFE_DELETE(_exec_env->_result_queue_mgr); SAFE_DELETE(_exec_env->_buffer_pool); SAFE_DELETE(_exec_env->_scan_thread_pool); SAFE_DELETE(_exec_env->_disk_io_mgr); SAFE_DELETE(_exec_env->_task_pool_mem_tracker_registry); SAFE_DELETE(_exec_env->_buffer_reservation); SAFE_DELETE(_exec_env->_thread_mgr); if (_engine == StorageEngine::_s_instance) { // the engine instance is created by this test env StorageEngine::_s_instance = nullptr; } SAFE_DELETE(_engine); } RuntimeState* TestEnv::create_runtime_state(int64_t query_id) { TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); plan_params.params.query_id.hi = 0; plan_params.params.query_id.lo = query_id; return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env); } Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int block_size, RuntimeState** runtime_state) { *runtime_state = create_runtime_state(query_id); if (*runtime_state == nullptr) { return Status::InternalError("Unexpected error creating RuntimeState"); } std::shared_ptr mgr; RETURN_IF_ERROR(BufferedBlockMgr2::create(*runtime_state, (*runtime_state)->runtime_profile(), _tmp_file_mgr.get(), block_size, &mgr)); (*runtime_state)->set_block_mgr2(mgr); // (*runtime_state)->_block_mgr = mgr; _query_states.push_back(std::shared_ptr(*runtime_state)); return Status::OK(); } Status TestEnv::create_query_states(int64_t start_query_id, int num_mgrs, int buffers_per_mgr, int block_size, std::vector* runtime_states) { for (int i = 0; i < num_mgrs; ++i) { RuntimeState* runtime_state = nullptr; RETURN_IF_ERROR(create_query_state(start_query_id + i, buffers_per_mgr, block_size, &runtime_state)); runtime_states->push_back(runtime_state); } return Status::OK(); } void TestEnv::tear_down_query_states() { _query_states.clear(); } int64_t TestEnv::calculate_mem_tracker(int max_buffers, int block_size) { DCHECK_GE(max_buffers, -1); if (max_buffers == -1) { return -1; } return max_buffers * static_cast(block_size); } void TestEnv::init_storage_engine(bool need_open, const std::vector& paths) { if (StorageEngine::_s_instance) { LOG(INFO) << "Engine instance already exists"; return; } // init and open storage engine doris::EngineOptions options; for (const auto& path : paths) { options.store_paths.emplace_back(path, -1); } options.backend_uid = UniqueId::gen_uid(); config::tablet_map_shard_size = 1; config::txn_map_shard_size = 1; config::txn_shard_size = 1; // This engine will be the singleton instance, cuz StorageEngine::_s_instance is nullptr now. Status st; if (need_open) { st = StorageEngine::open(options, &_engine); } else { _engine = new StorageEngine(options); } DCHECK(st.ok()) << st.get_error_msg(); _exec_env->set_storage_engine(_engine); } } // end namespace doris