@ -519,13 +519,15 @@ add_subdirectory(${SRC_DIR}/plugin)
|
||||
|
||||
# Utility CMake function to make specifying tests and benchmarks less verbose
|
||||
FUNCTION(ADD_BE_TEST TEST_NAME)
|
||||
# use argn to add additional files
|
||||
set(ADDITIONAL_FILES ${ARGN})
|
||||
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/")
|
||||
# This gets the directory where the test is from (e.g. 'exprs' or 'runtime')
|
||||
get_filename_component(DIR_NAME ${CMAKE_CURRENT_SOURCE_DIR} NAME)
|
||||
get_filename_component(TEST_DIR_NAME ${TEST_NAME} PATH)
|
||||
get_filename_component(TEST_FILE_NAME ${TEST_NAME} NAME)
|
||||
|
||||
ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp)
|
||||
ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES})
|
||||
TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS})
|
||||
SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control")
|
||||
if (NOT "${TEST_DIR_NAME}" STREQUAL "")
|
||||
|
||||
@ -140,6 +140,10 @@ StorageEngine::~StorageEngine() {
|
||||
DEREGISTER_HOOK_METRIC(unused_rowsets_count);
|
||||
DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption);
|
||||
_clear();
|
||||
|
||||
if(_compaction_thread_pool){
|
||||
_compaction_thread_pool->shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
|
||||
|
||||
@ -67,7 +67,6 @@ set(RUNTIME_FILES
|
||||
disk_io_mgr_reader_context.cc
|
||||
disk_io_mgr_scan_range.cc
|
||||
buffered_block_mgr2.cc
|
||||
test_env.cc
|
||||
mem_tracker.cpp
|
||||
spill_sorter.cc
|
||||
sorted_run_merger.cc
|
||||
|
||||
@ -404,9 +404,8 @@ Status DiskIoMgr::init(const std::shared_ptr<MemTracker>& process_mem_tracker) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DiskIoMgr::register_context(RequestContext** request_context,
|
||||
std::shared_ptr<MemTracker> mem_tracker) {
|
||||
DCHECK(_request_context_cache.get() != NULL) << "Must call init() first.";
|
||||
Status DiskIoMgr::register_context(RequestContext** request_context, std::shared_ptr<MemTracker> mem_tracker) {
|
||||
DCHECK(_request_context_cache) << "Must call init() first.";
|
||||
*request_context = _request_context_cache->get_new_context();
|
||||
(*request_context)->reset(std::move(mem_tracker));
|
||||
return Status::OK();
|
||||
|
||||
@ -361,8 +361,6 @@ public:
|
||||
int64_t get_load_mem_limit();
|
||||
|
||||
private:
|
||||
// Allow TestEnv to set block_mgr manually for testing.
|
||||
friend class TestEnv;
|
||||
|
||||
// Use a custom block manager for the query for testing purposes.
|
||||
void set_block_mgr2(const boost::shared_ptr<BufferedBlockMgr2>& block_mgr) {
|
||||
|
||||
@ -1,100 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/test_env.h"
|
||||
|
||||
#include "util/disk_info.h"
|
||||
#include "util/doris_metrics.h"
|
||||
|
||||
using boost::shared_ptr;
|
||||
|
||||
namespace doris {
|
||||
|
||||
TestEnv::TestEnv() {
|
||||
_exec_env.reset(new ExecEnv());
|
||||
// _exec_env->init_for_tests();
|
||||
_io_mgr_tracker.reset(new MemTracker(-1));
|
||||
_block_mgr_parent_tracker.reset(new MemTracker(-1));
|
||||
_exec_env->disk_io_mgr()->init(_io_mgr_tracker);
|
||||
_tmp_file_mgr.reset(new TmpFileMgr());
|
||||
_tmp_file_mgr->init();
|
||||
}
|
||||
|
||||
void TestEnv::init_tmp_file_mgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
|
||||
// Need to recreate metrics to avoid error when registering metric twice.
|
||||
_tmp_file_mgr.reset(new TmpFileMgr());
|
||||
_tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device);
|
||||
}
|
||||
|
||||
TestEnv::~TestEnv() {
|
||||
// Queries must be torn down first since they are dependent on global state.
|
||||
tear_down_query_states();
|
||||
_block_mgr_parent_tracker.reset();
|
||||
_exec_env.reset();
|
||||
_io_mgr_tracker.reset();
|
||||
_tmp_file_mgr.reset();
|
||||
}
|
||||
|
||||
RuntimeState* TestEnv::create_runtime_state(int64_t query_id) {
|
||||
TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
|
||||
plan_params.params.query_id.hi = 0;
|
||||
plan_params.params.query_id.lo = query_id;
|
||||
return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env.get());
|
||||
}
|
||||
|
||||
Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int block_size,
|
||||
RuntimeState** runtime_state) {
|
||||
*runtime_state = create_runtime_state(query_id);
|
||||
if (*runtime_state == NULL) {
|
||||
return Status::InternalError("Unexpected error creating RuntimeState");
|
||||
}
|
||||
|
||||
shared_ptr<BufferedBlockMgr2> mgr;
|
||||
RETURN_IF_ERROR(BufferedBlockMgr2::create(
|
||||
*runtime_state, _block_mgr_parent_tracker, (*runtime_state)->runtime_profile(),
|
||||
_tmp_file_mgr.get(), calculate_mem_tracker(max_buffers, block_size), block_size, &mgr));
|
||||
(*runtime_state)->set_block_mgr2(mgr);
|
||||
// (*runtime_state)->_block_mgr = mgr;
|
||||
|
||||
_query_states.push_back(shared_ptr<RuntimeState>(*runtime_state));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TestEnv::create_query_states(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
|
||||
int block_size, std::vector<RuntimeState*>* runtime_states) {
|
||||
for (int i = 0; i < num_mgrs; ++i) {
|
||||
RuntimeState* runtime_state = NULL;
|
||||
RETURN_IF_ERROR(create_query_state(start_query_id + i, buffers_per_mgr, block_size,
|
||||
&runtime_state));
|
||||
runtime_states->push_back(runtime_state);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void TestEnv::tear_down_query_states() {
|
||||
_query_states.clear();
|
||||
}
|
||||
|
||||
int64_t TestEnv::calculate_mem_tracker(int max_buffers, int block_size) {
|
||||
DCHECK_GE(max_buffers, -1);
|
||||
if (max_buffers == -1) {
|
||||
return -1;
|
||||
}
|
||||
return max_buffers * static_cast<int64_t>(block_size);
|
||||
}
|
||||
|
||||
} // end namespace doris
|
||||
@ -170,7 +170,7 @@ private:
|
||||
bool is_blacklisted(DeviceId device_id);
|
||||
|
||||
ExecEnv* _exec_env;
|
||||
bool _initialized;
|
||||
bool _initialized = false;
|
||||
|
||||
// Protects the status of tmp dirs (i.e. whether they're blacklisted).
|
||||
SpinLock _dir_status_lock;
|
||||
|
||||
@ -57,7 +57,7 @@ ADD_BE_TEST(small_file_mgr_test)
|
||||
ADD_BE_TEST(heartbeat_flags_test)
|
||||
|
||||
ADD_BE_TEST(result_queue_mgr_test)
|
||||
ADD_BE_TEST(memory_scratch_sink_test)
|
||||
ADD_BE_TEST(memory_scratch_sink_test test_env.cc)
|
||||
ADD_BE_TEST(external_scan_context_mgr_test)
|
||||
|
||||
ADD_BE_TEST(memory/chunk_allocator_test)
|
||||
|
||||
@ -33,14 +33,11 @@
|
||||
#include "gen_cpp/Types_types.h"
|
||||
#include "olap/options.h"
|
||||
#include "olap/row.h"
|
||||
#include "runtime/bufferpool/reservation_tracker.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "runtime/primitive_type.h"
|
||||
#include "runtime/result_queue_mgr.h"
|
||||
#include "runtime/row_batch.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_resource_mgr.h"
|
||||
#include "runtime/test_env.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
#include "testutil/desc_tbl_builder.h"
|
||||
#include "util/blocking_queue.hpp"
|
||||
@ -51,6 +48,7 @@ namespace doris {
|
||||
class MemoryScratchSinkTest : public testing::Test {
|
||||
public:
|
||||
MemoryScratchSinkTest() {
|
||||
_env = std::make_shared<TestEnv>();
|
||||
{
|
||||
TExpr expr;
|
||||
{
|
||||
@ -67,12 +65,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
~MemoryScratchSinkTest() {
|
||||
delete _state;
|
||||
delete _exec_env->_result_queue_mgr;
|
||||
delete _exec_env->_thread_mgr;
|
||||
delete _exec_env->_buffer_reservation;
|
||||
}
|
||||
~MemoryScratchSinkTest() { delete _state; }
|
||||
|
||||
virtual void SetUp() {
|
||||
config::periodic_counter_update_period_ms = 500;
|
||||
@ -96,7 +89,7 @@ public:
|
||||
|
||||
private:
|
||||
ObjectPool _obj_pool;
|
||||
ExecEnv* _exec_env = nullptr;
|
||||
std::shared_ptr<TestEnv> _env;
|
||||
// std::vector<TExpr> _exprs;
|
||||
TDescriptorTable _t_desc_table;
|
||||
RuntimeState* _state = nullptr;
|
||||
@ -109,21 +102,17 @@ private:
|
||||
};
|
||||
|
||||
void MemoryScratchSinkTest::init() {
|
||||
_exec_env = ExecEnv::GetInstance();
|
||||
init_desc_tbl();
|
||||
init_runtime_state();
|
||||
}
|
||||
|
||||
void MemoryScratchSinkTest::init_runtime_state() {
|
||||
_exec_env->_result_queue_mgr = new ResultQueueMgr();
|
||||
_exec_env->_thread_mgr = new ThreadResourceMgr();
|
||||
_exec_env->_buffer_reservation = new ReservationTracker();
|
||||
TQueryOptions query_options;
|
||||
query_options.batch_size = 1024;
|
||||
TUniqueId query_id;
|
||||
query_id.lo = 10;
|
||||
query_id.hi = 100;
|
||||
_state = new RuntimeState(query_id, query_options, TQueryGlobals(), _exec_env);
|
||||
_state = new RuntimeState(query_id, query_options, TQueryGlobals(), _env->exec_env());
|
||||
_state->init_instance_mem_tracker();
|
||||
_mem_tracker =
|
||||
MemTracker::CreateTracker(-1, "MemoryScratchSinkTest", _state->instance_mem_tracker());
|
||||
|
||||
151
be/test/runtime/test_env.cc
Normal file
151
be/test/runtime/test_env.cc
Normal file
@ -0,0 +1,151 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/test_env.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "olap/storage_engine.h"
|
||||
#include "runtime/fragment_mgr.h"
|
||||
#include "runtime/initial_reservations.h"
|
||||
#include "runtime/result_queue_mgr.h"
|
||||
#include "util/disk_info.h"
|
||||
#include "util/priority_thread_pool.hpp"
|
||||
|
||||
namespace doris {
|
||||
|
||||
TestEnv::TestEnv()
|
||||
: _block_mgr_parent_tracker(MemTracker::CreateTracker(-1, "block mgr parent")),
|
||||
_io_mgr_tracker(MemTracker::CreateTracker(-1, "io mgr")) {
|
||||
// Some code will use ExecEnv::GetInstance(), so init the global ExecEnv singleton
|
||||
_exec_env = ExecEnv::GetInstance();
|
||||
_exec_env->_thread_mgr = new ThreadResourceMgr(2);
|
||||
_exec_env->_buffer_reservation = new ReservationTracker();
|
||||
_exec_env->_mem_tracker = MemTracker::CreateTracker(-1, "TestEnv root");
|
||||
_exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10);
|
||||
_exec_env->disk_io_mgr()->init(_io_mgr_tracker);
|
||||
_exec_env->_thread_pool = new PriorityThreadPool(1, 16);
|
||||
_exec_env->_result_queue_mgr = new ResultQueueMgr();
|
||||
// TODO may need rpc support, etc.
|
||||
}
|
||||
|
||||
void TestEnv::init_tmp_file_mgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
|
||||
_tmp_file_mgr = std::make_shared<TmpFileMgr>();
|
||||
_exec_env->_tmp_file_mgr = _tmp_file_mgr.get();
|
||||
|
||||
DiskInfo::init();
|
||||
// will use DiskInfo::num_disks(), DiskInfo should be initialized before
|
||||
auto st = _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device);
|
||||
DCHECK(st.ok()) << st.get_error_msg();
|
||||
}
|
||||
|
||||
void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit) {
|
||||
_exec_env->_buffer_pool = new BufferPool(min_page_len, capacity, clean_pages_limit);
|
||||
}
|
||||
|
||||
TestEnv::~TestEnv() {
|
||||
SAFE_DELETE(_exec_env->_result_queue_mgr);
|
||||
SAFE_DELETE(_exec_env->_buffer_pool);
|
||||
SAFE_DELETE(_exec_env->_thread_pool);
|
||||
SAFE_DELETE(_exec_env->_disk_io_mgr);
|
||||
SAFE_DELETE(_exec_env->_buffer_reservation);
|
||||
SAFE_DELETE(_exec_env->_thread_mgr);
|
||||
|
||||
if (_engine == StorageEngine::_s_instance) {
|
||||
// the engine instance is created by this test env
|
||||
StorageEngine::_s_instance = nullptr;
|
||||
}
|
||||
SAFE_DELETE(_engine);
|
||||
}
|
||||
|
||||
RuntimeState* TestEnv::create_runtime_state(int64_t query_id) {
|
||||
TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
|
||||
plan_params.params.query_id.hi = 0;
|
||||
plan_params.params.query_id.lo = query_id;
|
||||
return new RuntimeState(plan_params.params, TQueryOptions(), TQueryGlobals(), _exec_env);
|
||||
}
|
||||
|
||||
Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int block_size,
|
||||
RuntimeState** runtime_state) {
|
||||
*runtime_state = create_runtime_state(query_id);
|
||||
if (*runtime_state == nullptr) {
|
||||
return Status::InternalError("Unexpected error creating RuntimeState");
|
||||
}
|
||||
|
||||
boost::shared_ptr<BufferedBlockMgr2> mgr;
|
||||
RETURN_IF_ERROR(BufferedBlockMgr2::create(
|
||||
*runtime_state, _block_mgr_parent_tracker, (*runtime_state)->runtime_profile(),
|
||||
_tmp_file_mgr.get(), calculate_mem_tracker(max_buffers, block_size), block_size, &mgr));
|
||||
(*runtime_state)->set_block_mgr2(mgr);
|
||||
// (*runtime_state)->_block_mgr = mgr;
|
||||
|
||||
_query_states.push_back(std::shared_ptr<RuntimeState>(*runtime_state));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TestEnv::create_query_states(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
|
||||
int block_size, std::vector<RuntimeState*>* runtime_states) {
|
||||
for (int i = 0; i < num_mgrs; ++i) {
|
||||
RuntimeState* runtime_state = nullptr;
|
||||
RETURN_IF_ERROR(create_query_state(start_query_id + i, buffers_per_mgr, block_size,
|
||||
&runtime_state));
|
||||
runtime_states->push_back(runtime_state);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void TestEnv::tear_down_query_states() {
|
||||
_query_states.clear();
|
||||
}
|
||||
|
||||
int64_t TestEnv::calculate_mem_tracker(int max_buffers, int block_size) {
|
||||
DCHECK_GE(max_buffers, -1);
|
||||
if (max_buffers == -1) {
|
||||
return -1;
|
||||
}
|
||||
return max_buffers * static_cast<int64_t>(block_size);
|
||||
}
|
||||
|
||||
void TestEnv::init_storage_engine(bool need_open, const std::vector<std::string>& paths) {
|
||||
if (StorageEngine::_s_instance) {
|
||||
LOG(INFO) << "Engine instance already exists";
|
||||
return;
|
||||
}
|
||||
// init and open storage engine
|
||||
doris::EngineOptions options;
|
||||
for (const auto& path : paths) {
|
||||
options.store_paths.emplace_back(path, -1);
|
||||
}
|
||||
options.backend_uid = UniqueId::gen_uid();
|
||||
config::tablet_map_shard_size = 1;
|
||||
config::txn_map_shard_size = 1;
|
||||
config::txn_shard_size = 1;
|
||||
|
||||
// This engine will be the singleton instance, cuz StorageEngine::_s_instance is nullptr now.
|
||||
Status st;
|
||||
if (need_open) {
|
||||
st = StorageEngine::open(options, &_engine);
|
||||
} else {
|
||||
_engine = new StorageEngine(options);
|
||||
}
|
||||
DCHECK(st.ok()) << st.get_error_msg();
|
||||
_exec_env->set_storage_engine(_engine);
|
||||
}
|
||||
|
||||
} // end namespace doris
|
||||
@ -25,8 +25,9 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
// Helper testing class that creates an environment with a buffered-block-mgr similar
|
||||
// to the one Impala's runtime is using.
|
||||
/// Helper testing class that creates an environment with runtime memory management
|
||||
/// similar to the one used by the Doris runtime. Only one TestEnv can be active at a
|
||||
/// time, because it modifies the global ExecEnv singleton.
|
||||
class TestEnv {
|
||||
public:
|
||||
TestEnv();
|
||||
@ -36,6 +37,11 @@ public:
|
||||
// query states have been created.
|
||||
void init_tmp_file_mgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
|
||||
|
||||
void init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit);
|
||||
|
||||
// If don't need to open, paths can be empty.
|
||||
void init_storage_engine(bool need_open, const std::vector<std::string>& paths = {});
|
||||
|
||||
// Create a RuntimeState for a query with a new block manager. The RuntimeState is
|
||||
// owned by the TestEnv.
|
||||
Status create_query_state(int64_t query_id, int max_buffers, int block_size,
|
||||
@ -51,28 +57,26 @@ public:
|
||||
|
||||
// Calculate memory limit accounting for overflow and negative values.
|
||||
// If max_buffers is -1, no memory limit will apply.
|
||||
int64_t calculate_mem_tracker(int max_buffers, int block_size);
|
||||
static int64_t calculate_mem_tracker(int max_buffers, int block_size);
|
||||
|
||||
ExecEnv* exec_env() { return _exec_env.get(); }
|
||||
ExecEnv* exec_env() { return _exec_env; }
|
||||
std::shared_ptr<MemTracker> block_mgr_parent_tracker() { return _block_mgr_parent_tracker; }
|
||||
MemTracker* io_mgr_tracker() { return _io_mgr_tracker.get(); }
|
||||
TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr.get(); }
|
||||
|
||||
private:
|
||||
// Recreate global metric groups.
|
||||
void init_metrics();
|
||||
|
||||
// Create a new RuntimeState sharing global environment.
|
||||
RuntimeState* create_runtime_state(int64_t query_id);
|
||||
|
||||
// Global state for test environment.
|
||||
boost::scoped_ptr<ExecEnv> _exec_env;
|
||||
ExecEnv* _exec_env;
|
||||
std::shared_ptr<MemTracker> _block_mgr_parent_tracker;
|
||||
std::shared_ptr<MemTracker> _io_mgr_tracker;
|
||||
boost::scoped_ptr<TmpFileMgr> _tmp_file_mgr;
|
||||
std::shared_ptr<TmpFileMgr> _tmp_file_mgr;
|
||||
|
||||
// Per-query states with associated block managers.
|
||||
std::vector<boost::shared_ptr<RuntimeState>> _query_states;
|
||||
std::vector<std::shared_ptr<RuntimeState> > _query_states;
|
||||
|
||||
StorageEngine* _engine = nullptr;
|
||||
};
|
||||
|
||||
} // end namespace doris
|
||||
Reference in New Issue
Block a user