diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 23f1c8e96c..c2f591285f 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -116,6 +116,11 @@ Status DataDir::init() { return Status::OK(); } +void DataDir::stop_bg_worker() { + _stop_bg_worker = true; + _cv.notify_one(); +} + Status DataDir::_init_cluster_id() { std::string cluster_id_path = _path + CLUSTER_ID_PREFIX; if (access(cluster_id_path.c_str(), F_OK) != 0) { @@ -803,7 +808,10 @@ void DataDir::perform_path_gc_by_rowsetid() { // init the set of valid path // validate the path in data dir std::unique_lock lck(_check_path_mutex); - cv.wait(lck, [this]{return _all_check_paths.size() > 0;}); + _cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); }); + if (_stop_bg_worker) { + return; + } LOG(INFO) << "start to path gc by rowsetid."; int counter = 0; for (auto& path : _all_check_paths) { @@ -900,7 +908,7 @@ void DataDir::perform_path_scan() { } LOG(INFO) << "scan data dir path:" << _path << " finished. path size:" << _all_check_paths.size(); } - cv.notify_one(); + _cv.notify_one(); } void DataDir::_process_garbage_path(const std::string& path) { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 2c03de97b6..f1c0a8abda 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -49,6 +49,7 @@ public: ~DataDir(); Status init(); + void stop_bg_worker(); const std::string& path() const { return _path; } size_t path_hash() const { return _path_hash; } @@ -151,6 +152,8 @@ private: bool _check_pending_ids(const std::string& id); private: + bool _stop_bg_worker = false; + std::string _path; size_t _path_hash; // user specified capacity @@ -188,7 +191,7 @@ private: std::set _all_check_paths; std::mutex _check_path_mutex; - std::condition_variable cv; + std::condition_variable _cv; std::set _pending_path_ids; RWMutex _pending_path_mutex; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 0699778402..ec60b0bef3 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -31,11 +31,23 @@ #include "olap/olap_define.h" #include "olap/storage_engine.h" #include "agent/cgroups_mgr.h" +#include "util/time.h" using std::string; namespace doris { +// TODO(yingchun): should be more graceful in the future refactor. +#define SLEEP_IN_BG_WORKER(seconds) \ + int64_t left_seconds = (seconds); \ + while (!_stop_bg_worker && left_seconds > 0) { \ + sleep(1); \ + --left_seconds; \ + } \ + if (_stop_bg_worker) { \ + break; \ + } + // number of running SCHEMA-CHANGE threads volatile uint32_t g_schema_change_active_threads = 0; @@ -154,8 +166,9 @@ void* StorageEngine::_fd_cache_clean_callback(void* arg) { "force set to 3600", interval); interval = 3600; } - while (true) { - sleep(interval); + while (!_stop_bg_worker) { + SLEEP_IN_BG_WORKER(interval); + _start_clean_fd_cache(); } @@ -175,7 +188,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d //string last_base_compaction_fs; //TTabletId last_base_compaction_tablet_id = -1; - while (true) { + while (!_stop_bg_worker) { // must be here, because this thread is start on start and // cgroup is not initialized at this time // add tid to cgroup @@ -184,7 +197,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d _perform_base_compaction(data_dir); } - usleep(interval * 1000000); + SLEEP_IN_BG_WORKER(interval); } return nullptr; @@ -210,7 +223,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { const double pi = 4 * std::atan(1); double usage = 1.0; // 程序启动后经过min_interval后触发第一轮扫描 - while (true) { + while (!_stop_bg_worker) { usage *= 100.0; // 该函数特性:当磁盘使用率<60%的时候,ratio接近于1; // 当使用率介于[60%, 75%]之间时,ratio急速从0.87降到0.27; @@ -222,7 +235,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { // 此时的特性,当usage<60%时,curr_interval的时间接近max_interval, // 当usage > 80%时,curr_interval接近min_interval curr_interval = curr_interval > min_interval ? curr_interval : min_interval; - sleep(curr_interval); + SLEEP_IN_BG_WORKER(curr_interval); // 开始清理,并得到清理后的磁盘使用率 OLAPStatus res = _start_trash_sweep(&usage); @@ -249,9 +262,9 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) { interval = 1; } - while (true) { + while (!_stop_bg_worker) { _start_disk_stat_monitor(); - sleep(interval); + SLEEP_IN_BG_WORKER(interval); } return nullptr; @@ -269,7 +282,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* interval = 1; } - while (true) { + while (!_stop_bg_worker) { // must be here, because this thread is start on start and // cgroup is not initialized at this time // add tid to cgroup @@ -277,7 +290,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* if (!data_dir->reach_capacity_limit(0)) { _perform_cumulative_compaction(data_dir); } - usleep(interval * 1000000); + SLEEP_IN_BG_WORKER(interval); } return nullptr; @@ -296,9 +309,77 @@ void* StorageEngine::_unused_rowset_monitor_thread_callback(void* arg) { interval = 1; } - while (true) { + while (!_stop_bg_worker) { start_delete_unused_rowset(); - sleep(interval); + SLEEP_IN_BG_WORKER(interval); + } + + return nullptr; +} + + + +void* StorageEngine::_path_gc_thread_callback(void* arg) { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + + LOG(INFO) << "try to start path gc thread!"; + uint32_t interval = config::path_gc_check_interval_second; + if (interval <= 0) { + LOG(WARNING) << "path gc thread check interval config is illegal:" << interval + << "will be forced set to half hour"; + interval = 1800; // 0.5 hour + } + + while (!_stop_bg_worker) { + LOG(INFO) << "try to perform path gc!"; + // perform path gc by rowset id + ((DataDir*)arg)->perform_path_gc_by_rowsetid(); + SLEEP_IN_BG_WORKER(interval); + } + + return nullptr; +} + +void* StorageEngine::_path_scan_thread_callback(void* arg) { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + + LOG(INFO) << "try to start path scan thread!"; + uint32_t interval = config::path_scan_interval_second; + if (interval <= 0) { + LOG(WARNING) << "path gc thread check interval config is illegal:" << interval + << "will be forced set to one day"; + interval = 24 * 3600; // one day + } + + while (!_stop_bg_worker) { + LOG(INFO) << "try to perform path scan!"; + ((DataDir*)arg)->perform_path_scan(); + SLEEP_IN_BG_WORKER(interval); + } + + return nullptr; +} + +void* StorageEngine::_tablet_checkpoint_callback(void* arg) { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + LOG(INFO) << "try to start tablet meta checkpoint thread!"; + while (!_stop_bg_worker) { + LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path(); + int64_t start_time = UnixMillis(); + _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); + int64_t used_time = (UnixMillis() - start_time) / 1000; + if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { + int64_t interval = config::tablet_meta_checkpoint_min_interval_secs - used_time; + SLEEP_IN_BG_WORKER(interval); + } else { + sleep(1); + } } return nullptr; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index b5530dcddd..4912cf37df 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -124,7 +124,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) } StorageEngine::~StorageEngine() { - clear(); + _clear(); } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { @@ -425,7 +425,7 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() { return !tablet_info_vec.empty(); } -OLAPStatus StorageEngine::clear() { +void StorageEngine::_clear() { // 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的 delete FileHandler::get_fd_cache(); FileHandler::set_fd_cache(nullptr); @@ -433,12 +433,13 @@ OLAPStatus StorageEngine::clear() { std::lock_guard l(_store_lock); for (auto& store_pair : _store_map) { + store_pair.second->stop_bg_worker(); delete store_pair.second; store_pair.second = nullptr; } _store_map.clear(); - return OLAP_SUCCESS; + _stop_bg_worker = true; } void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { @@ -930,69 +931,4 @@ bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) return false; } -void* StorageEngine::_path_gc_thread_callback(void* arg) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - - LOG(INFO) << "try to start path gc thread!"; - uint32_t interval = config::path_gc_check_interval_second; - if (interval <= 0) { - LOG(WARNING) << "path gc thread check interval config is illegal:" << interval - << "will be forced set to half hour"; - interval = 1800; // 0.5 hour - } - - while (true) { - LOG(INFO) << "try to perform path gc!"; - // perform path gc by rowset id - ((DataDir*)arg)->perform_path_gc_by_rowsetid(); - usleep(interval * 1000000); - } - - return nullptr; -} - -void* StorageEngine::_path_scan_thread_callback(void* arg) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - - LOG(INFO) << "try to start path scan thread!"; - uint32_t interval = config::path_scan_interval_second; - if (interval <= 0) { - LOG(WARNING) << "path gc thread check interval config is illegal:" << interval - << "will be forced set to one day"; - interval = 24 * 3600; // one day - } - - while (true) { - LOG(INFO) << "try to perform path scan!"; - ((DataDir*)arg)->perform_path_scan(); - usleep(interval * 1000000); - } - - return nullptr; -} - -void* StorageEngine::_tablet_checkpoint_callback(void* arg) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - LOG(INFO) << "try to start tablet meta checkpoint thread!"; - while (true) { - LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path(); - int64_t start_time = UnixMillis(); - _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); - int64_t used_time = (UnixMillis() - start_time) / 1000; - if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { - sleep(config::tablet_meta_checkpoint_min_interval_secs - used_time); - } else { - sleep(1); - } - } - - return nullptr; -} - } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 0e17182cd9..724b48c691 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -76,9 +76,6 @@ public: void clear_transaction_task(const TTransactionId transaction_id, const std::vector& partition_ids); - // Clear status(tables, ...) - OLAPStatus clear(); - // 获取cache的使用情况信息 void get_cache_status(rapidjson::Document* document) const; @@ -204,6 +201,9 @@ private: OLAPStatus _start_bg_worker(); + // Clear status(tables, ...) + void _clear(); + void _update_storage_medium_type_count(); // Some check methods @@ -302,6 +302,7 @@ private: Mutex _gc_mutex; std::unordered_map _unused_rowsets; + bool _stop_bg_worker = false; std::thread _unused_rowset_monitor_thread; // thread to monitor snapshot expiry std::thread _garbage_sweeper_thread; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index d456684f10..0fa5dc024f 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -67,8 +67,9 @@ static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSha TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) : _tablet_map_lock_shard_size(tablet_map_lock_shard_size), _last_update_stat_ms(0) { - _tablet_map_lock_array = new RWMutex[tablet_map_lock_shard_size]; - _tablet_map_array = new tablet_map_t[tablet_map_lock_shard_size]; + DCHECK_LT(_tablet_map_lock_shard_size, 0); + _tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size]; + _tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size]; } TabletManager::~TabletManager() { diff --git a/be/test/olap/aggregate_func_test.cpp b/be/test/olap/aggregate_func_test.cpp index 044c077166..5c9d9b1744 100644 --- a/be/test/olap/aggregate_func_test.cpp +++ b/be/test/olap/aggregate_func_test.cpp @@ -37,6 +37,7 @@ public: template void test_min() { using CppType = typename CppTypeTraits::CppType; + static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag. char buf[64]; std::unique_ptr tracker(new MemTracker(-1)); @@ -47,14 +48,14 @@ void test_min() { RowCursorCell dst(buf); // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 100; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -65,7 +66,7 @@ void test_min() { } // 200 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 200; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -76,7 +77,7 @@ void test_min() { } // 50 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 50; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -87,7 +88,7 @@ void test_min() { } // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->update(&dst, val_buf, mem_pool.get()); ASSERT_FALSE(*(bool*)(buf)); @@ -111,6 +112,7 @@ TEST_F(AggregateFuncTest, min) { template void test_max() { using CppType = typename CppTypeTraits::CppType; + static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag. char buf[64]; @@ -122,14 +124,14 @@ void test_max() { RowCursorCell dst(buf); // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 100; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -140,7 +142,7 @@ void test_max() { } // 200 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 200; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -151,7 +153,7 @@ void test_max() { } // 50 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 50; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -162,7 +164,7 @@ void test_max() { } // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->update(&dst, val_buf, mem_pool.get()); ASSERT_FALSE(*(bool*)(buf)); @@ -185,6 +187,7 @@ TEST_F(AggregateFuncTest, max) { template void test_sum() { using CppType = typename CppTypeTraits::CppType; + static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag. char buf[64]; RowCursorCell dst(buf); @@ -196,14 +199,14 @@ void test_sum() { // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 100; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -214,7 +217,7 @@ void test_sum() { } // 200 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 200; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -225,7 +228,7 @@ void test_sum() { } // 50 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 50; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -236,7 +239,7 @@ void test_sum() { } // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->update(&dst, val_buf, mem_pool.get()); ASSERT_FALSE(*(bool*)(buf)); @@ -259,6 +262,7 @@ TEST_F(AggregateFuncTest, sum) { template void test_replace() { using CppType = typename CppTypeTraits::CppType; + static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag. char buf[64]; RowCursorCell dst(buf); @@ -270,14 +274,14 @@ void test_replace() { // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 100; memcpy(val_buf + 1, &val, sizeof(CppType)); @@ -288,14 +292,14 @@ void test_replace() { } // null { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = true; agg->update(&dst, val_buf, mem_pool.get()); ASSERT_TRUE(*(bool*)(buf)); } // 50 { - char val_buf[16]; + char val_buf[kValSize]; *(bool*)val_buf = false; CppType val = 50; memcpy(val_buf + 1, &val, sizeof(CppType)); diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index ccfe0ca61d..ef8b2ba0fd 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -55,6 +55,7 @@ void set_up() { std::vector paths; paths.emplace_back(config::storage_root_path, -1); config::min_file_descriptor_number = 1000; + config::tablet_map_shard_size = 1; doris::EngineOptions options; options.store_paths = paths; @@ -184,9 +185,6 @@ protected: tablet.reset(); StorageEngine::instance()->tablet_manager()->drop_tablet( _create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash); - while (0 == access(_tablet_path.c_str(), F_OK)) { - sleep(1); - } ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -293,9 +291,6 @@ protected: tablet.reset(); StorageEngine::instance()->tablet_manager()->drop_tablet( _create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash); - while (0 == access(_tablet_path.c_str(), F_OK)) { - sleep(1); - } ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -634,9 +629,6 @@ protected: _delete_handler.finalize(); StorageEngine::instance()->tablet_manager()->drop_tablet( _create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash); - while (0 == access(_tablet_path.c_str(), F_OK)) { - sleep(1); - } ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp index a802c0e4d4..1022825f07 100644 --- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp @@ -33,14 +33,23 @@ namespace doris { namespace segment_v2 { +const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test"; + class BloomFilterIndexReaderWriterTest : public testing::Test { public: - BloomFilterIndexReaderWriterTest() { } - virtual ~BloomFilterIndexReaderWriterTest() { + virtual void SetUp() { + if (FileUtils::is_dir(dname)) { + std::set files; + ASSERT_TRUE(FileUtils::list_dirs_files(dname, nullptr, &files, Env::Default()).ok()); + for (const auto& file : files) { + Status s = Env::Default()->delete_file(dname + "/" + file); + ASSERT_TRUE(s.ok()) << s.to_string(); + } + ASSERT_TRUE(Env::Default()->delete_dir(dname).ok()); + } } }; -const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test"; template void write_bloom_filter_index_file(const std::string& file_name, const void* values, @@ -54,7 +63,7 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val std::unique_ptr wblock; fs::CreateBlockOptions opts({ fname }); Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.ok()) << st.to_string(); std::unique_ptr bloom_filter_index_writer; BloomFilterOptions bf_options;