[UT] Fix some BE unit tests (#3110)

And also support graceful exit for StorageEngine to avoid hang too long
time in unit test.
This commit is contained in:
Yingchun Lai
2020-03-16 13:31:44 +08:00
committed by GitHub
parent f4b028915b
commit 64a06ea9d4
9 changed files with 155 additions and 120 deletions

View File

@ -116,6 +116,11 @@ Status DataDir::init() {
return Status::OK();
}
void DataDir::stop_bg_worker() {
_stop_bg_worker = true;
_cv.notify_one();
}
Status DataDir::_init_cluster_id() {
std::string cluster_id_path = _path + CLUSTER_ID_PREFIX;
if (access(cluster_id_path.c_str(), F_OK) != 0) {
@ -803,7 +808,10 @@ void DataDir::perform_path_gc_by_rowsetid() {
// init the set of valid path
// validate the path in data dir
std::unique_lock<std::mutex> lck(_check_path_mutex);
cv.wait(lck, [this]{return _all_check_paths.size() > 0;});
_cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); });
if (_stop_bg_worker) {
return;
}
LOG(INFO) << "start to path gc by rowsetid.";
int counter = 0;
for (auto& path : _all_check_paths) {
@ -900,7 +908,7 @@ void DataDir::perform_path_scan() {
}
LOG(INFO) << "scan data dir path:" << _path << " finished. path size:" << _all_check_paths.size();
}
cv.notify_one();
_cv.notify_one();
}
void DataDir::_process_garbage_path(const std::string& path) {

View File

@ -49,6 +49,7 @@ public:
~DataDir();
Status init();
void stop_bg_worker();
const std::string& path() const { return _path; }
size_t path_hash() const { return _path_hash; }
@ -151,6 +152,8 @@ private:
bool _check_pending_ids(const std::string& id);
private:
bool _stop_bg_worker = false;
std::string _path;
size_t _path_hash;
// user specified capacity
@ -188,7 +191,7 @@ private:
std::set<std::string> _all_check_paths;
std::mutex _check_path_mutex;
std::condition_variable cv;
std::condition_variable _cv;
std::set<std::string> _pending_path_ids;
RWMutex _pending_path_mutex;

View File

@ -31,11 +31,23 @@
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "agent/cgroups_mgr.h"
#include "util/time.h"
using std::string;
namespace doris {
// TODO(yingchun): should be more graceful in the future refactor.
#define SLEEP_IN_BG_WORKER(seconds) \
int64_t left_seconds = (seconds); \
while (!_stop_bg_worker && left_seconds > 0) { \
sleep(1); \
--left_seconds; \
} \
if (_stop_bg_worker) { \
break; \
}
// number of running SCHEMA-CHANGE threads
volatile uint32_t g_schema_change_active_threads = 0;
@ -154,8 +166,9 @@ void* StorageEngine::_fd_cache_clean_callback(void* arg) {
"force set to 3600", interval);
interval = 3600;
}
while (true) {
sleep(interval);
while (!_stop_bg_worker) {
SLEEP_IN_BG_WORKER(interval);
_start_clean_fd_cache();
}
@ -175,7 +188,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d
//string last_base_compaction_fs;
//TTabletId last_base_compaction_tablet_id = -1;
while (true) {
while (!_stop_bg_worker) {
// must be here, because this thread is start on start and
// cgroup is not initialized at this time
// add tid to cgroup
@ -184,7 +197,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d
_perform_base_compaction(data_dir);
}
usleep(interval * 1000000);
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
@ -210,7 +223,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) {
const double pi = 4 * std::atan(1);
double usage = 1.0;
// 程序启动后经过min_interval后触发第一轮扫描
while (true) {
while (!_stop_bg_worker) {
usage *= 100.0;
// 该函数特性:当磁盘使用率<60%的时候,ratio接近于1;
// 当使用率介于[60%, 75%]之间时,ratio急速从0.87降到0.27;
@ -222,7 +235,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) {
// 此时的特性,当usage<60%时,curr_interval的时间接近max_interval,
// 当usage > 80%时,curr_interval接近min_interval
curr_interval = curr_interval > min_interval ? curr_interval : min_interval;
sleep(curr_interval);
SLEEP_IN_BG_WORKER(curr_interval);
// 开始清理,并得到清理后的磁盘使用率
OLAPStatus res = _start_trash_sweep(&usage);
@ -249,9 +262,9 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) {
interval = 1;
}
while (true) {
while (!_stop_bg_worker) {
_start_disk_stat_monitor();
sleep(interval);
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
@ -269,7 +282,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir*
interval = 1;
}
while (true) {
while (!_stop_bg_worker) {
// must be here, because this thread is start on start and
// cgroup is not initialized at this time
// add tid to cgroup
@ -277,7 +290,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir*
if (!data_dir->reach_capacity_limit(0)) {
_perform_cumulative_compaction(data_dir);
}
usleep(interval * 1000000);
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
@ -296,9 +309,77 @@ void* StorageEngine::_unused_rowset_monitor_thread_callback(void* arg) {
interval = 1;
}
while (true) {
while (!_stop_bg_worker) {
start_delete_unused_rowset();
sleep(interval);
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
}
void* StorageEngine::_path_gc_thread_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start path gc thread!";
uint32_t interval = config::path_gc_check_interval_second;
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
<< "will be forced set to half hour";
interval = 1800; // 0.5 hour
}
while (!_stop_bg_worker) {
LOG(INFO) << "try to perform path gc!";
// perform path gc by rowset id
((DataDir*)arg)->perform_path_gc_by_rowsetid();
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
}
void* StorageEngine::_path_scan_thread_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start path scan thread!";
uint32_t interval = config::path_scan_interval_second;
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
<< "will be forced set to one day";
interval = 24 * 3600; // one day
}
while (!_stop_bg_worker) {
LOG(INFO) << "try to perform path scan!";
((DataDir*)arg)->perform_path_scan();
SLEEP_IN_BG_WORKER(interval);
}
return nullptr;
}
void* StorageEngine::_tablet_checkpoint_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start tablet meta checkpoint thread!";
while (!_stop_bg_worker) {
LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path();
int64_t start_time = UnixMillis();
_tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg);
int64_t used_time = (UnixMillis() - start_time) / 1000;
if (used_time < config::tablet_meta_checkpoint_min_interval_secs) {
int64_t interval = config::tablet_meta_checkpoint_min_interval_secs - used_time;
SLEEP_IN_BG_WORKER(interval);
} else {
sleep(1);
}
}
return nullptr;

View File

@ -124,7 +124,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
}
StorageEngine::~StorageEngine() {
clear();
_clear();
}
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@ -425,7 +425,7 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() {
return !tablet_info_vec.empty();
}
OLAPStatus StorageEngine::clear() {
void StorageEngine::_clear() {
// 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的
delete FileHandler::get_fd_cache();
FileHandler::set_fd_cache(nullptr);
@ -433,12 +433,13 @@ OLAPStatus StorageEngine::clear() {
std::lock_guard<std::mutex> l(_store_lock);
for (auto& store_pair : _store_map) {
store_pair.second->stop_bg_worker();
delete store_pair.second;
store_pair.second = nullptr;
}
_store_map.clear();
return OLAP_SUCCESS;
_stop_bg_worker = true;
}
void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) {
@ -930,69 +931,4 @@ bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id)
return false;
}
void* StorageEngine::_path_gc_thread_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start path gc thread!";
uint32_t interval = config::path_gc_check_interval_second;
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
<< "will be forced set to half hour";
interval = 1800; // 0.5 hour
}
while (true) {
LOG(INFO) << "try to perform path gc!";
// perform path gc by rowset id
((DataDir*)arg)->perform_path_gc_by_rowsetid();
usleep(interval * 1000000);
}
return nullptr;
}
void* StorageEngine::_path_scan_thread_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start path scan thread!";
uint32_t interval = config::path_scan_interval_second;
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
<< "will be forced set to one day";
interval = 24 * 3600; // one day
}
while (true) {
LOG(INFO) << "try to perform path scan!";
((DataDir*)arg)->perform_path_scan();
usleep(interval * 1000000);
}
return nullptr;
}
void* StorageEngine::_tablet_checkpoint_callback(void* arg) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
LOG(INFO) << "try to start tablet meta checkpoint thread!";
while (true) {
LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path();
int64_t start_time = UnixMillis();
_tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg);
int64_t used_time = (UnixMillis() - start_time) / 1000;
if (used_time < config::tablet_meta_checkpoint_min_interval_secs) {
sleep(config::tablet_meta_checkpoint_min_interval_secs - used_time);
} else {
sleep(1);
}
}
return nullptr;
}
} // namespace doris

View File

@ -76,9 +76,6 @@ public:
void clear_transaction_task(const TTransactionId transaction_id,
const std::vector<TPartitionId>& partition_ids);
// Clear status(tables, ...)
OLAPStatus clear();
// 获取cache的使用情况信息
void get_cache_status(rapidjson::Document* document) const;
@ -204,6 +201,9 @@ private:
OLAPStatus _start_bg_worker();
// Clear status(tables, ...)
void _clear();
void _update_storage_medium_type_count();
// Some check methods
@ -302,6 +302,7 @@ private:
Mutex _gc_mutex;
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
bool _stop_bg_worker = false;
std::thread _unused_rowset_monitor_thread;
// thread to monitor snapshot expiry
std::thread _garbage_sweeper_thread;

View File

@ -67,8 +67,9 @@ static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSha
TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
: _tablet_map_lock_shard_size(tablet_map_lock_shard_size),
_last_update_stat_ms(0) {
_tablet_map_lock_array = new RWMutex[tablet_map_lock_shard_size];
_tablet_map_array = new tablet_map_t[tablet_map_lock_shard_size];
DCHECK_LT(_tablet_map_lock_shard_size, 0);
_tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size];
_tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size];
}
TabletManager::~TabletManager() {

View File

@ -37,6 +37,7 @@ public:
template<FieldType field_type>
void test_min() {
using CppType = typename CppTypeTraits<field_type>::CppType;
static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag.
char buf[64];
std::unique_ptr<MemTracker> tracker(new MemTracker(-1));
@ -47,14 +48,14 @@ void test_min() {
RowCursorCell dst(buf);
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 100;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -65,7 +66,7 @@ void test_min() {
}
// 200
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 200;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -76,7 +77,7 @@ void test_min() {
}
// 50
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 50;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -87,7 +88,7 @@ void test_min() {
}
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->update(&dst, val_buf, mem_pool.get());
ASSERT_FALSE(*(bool*)(buf));
@ -111,6 +112,7 @@ TEST_F(AggregateFuncTest, min) {
template<FieldType field_type>
void test_max() {
using CppType = typename CppTypeTraits<field_type>::CppType;
static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag.
char buf[64];
@ -122,14 +124,14 @@ void test_max() {
RowCursorCell dst(buf);
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 100;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -140,7 +142,7 @@ void test_max() {
}
// 200
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 200;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -151,7 +153,7 @@ void test_max() {
}
// 50
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 50;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -162,7 +164,7 @@ void test_max() {
}
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->update(&dst, val_buf, mem_pool.get());
ASSERT_FALSE(*(bool*)(buf));
@ -185,6 +187,7 @@ TEST_F(AggregateFuncTest, max) {
template<FieldType field_type>
void test_sum() {
using CppType = typename CppTypeTraits<field_type>::CppType;
static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag.
char buf[64];
RowCursorCell dst(buf);
@ -196,14 +199,14 @@ void test_sum() {
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 100;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -214,7 +217,7 @@ void test_sum() {
}
// 200
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 200;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -225,7 +228,7 @@ void test_sum() {
}
// 50
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 50;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -236,7 +239,7 @@ void test_sum() {
}
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->update(&dst, val_buf, mem_pool.get());
ASSERT_FALSE(*(bool*)(buf));
@ -259,6 +262,7 @@ TEST_F(AggregateFuncTest, sum) {
template<FieldType field_type>
void test_replace() {
using CppType = typename CppTypeTraits<field_type>::CppType;
static const size_t kValSize = sizeof(CppType) + 1; // '1' represent the leading bool flag.
char buf[64];
RowCursorCell dst(buf);
@ -270,14 +274,14 @@ void test_replace() {
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->init(&dst, val_buf, true, mem_pool.get(), &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 100;
memcpy(val_buf + 1, &val, sizeof(CppType));
@ -288,14 +292,14 @@ void test_replace() {
}
// null
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = true;
agg->update(&dst, val_buf, mem_pool.get());
ASSERT_TRUE(*(bool*)(buf));
}
// 50
{
char val_buf[16];
char val_buf[kValSize];
*(bool*)val_buf = false;
CppType val = 50;
memcpy(val_buf + 1, &val, sizeof(CppType));

View File

@ -55,6 +55,7 @@ void set_up() {
std::vector<StorePath> paths;
paths.emplace_back(config::storage_root_path, -1);
config::min_file_descriptor_number = 1000;
config::tablet_map_shard_size = 1;
doris::EngineOptions options;
options.store_paths = paths;
@ -184,9 +185,6 @@ protected:
tablet.reset();
StorageEngine::instance()->tablet_manager()->drop_tablet(
_create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash);
while (0 == access(_tablet_path.c_str(), F_OK)) {
sleep(1);
}
ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@ -293,9 +291,6 @@ protected:
tablet.reset();
StorageEngine::instance()->tablet_manager()->drop_tablet(
_create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash);
while (0 == access(_tablet_path.c_str(), F_OK)) {
sleep(1);
}
ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@ -634,9 +629,6 @@ protected:
_delete_handler.finalize();
StorageEngine::instance()->tablet_manager()->drop_tablet(
_create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash);
while (0 == access(_tablet_path.c_str(), F_OK)) {
sleep(1);
}
ASSERT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}

View File

@ -33,14 +33,23 @@
namespace doris {
namespace segment_v2 {
const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test";
class BloomFilterIndexReaderWriterTest : public testing::Test {
public:
BloomFilterIndexReaderWriterTest() { }
virtual ~BloomFilterIndexReaderWriterTest() {
virtual void SetUp() {
if (FileUtils::is_dir(dname)) {
std::set<std::string> files;
ASSERT_TRUE(FileUtils::list_dirs_files(dname, nullptr, &files, Env::Default()).ok());
for (const auto& file : files) {
Status s = Env::Default()->delete_file(dname + "/" + file);
ASSERT_TRUE(s.ok()) << s.to_string();
}
ASSERT_TRUE(Env::Default()->delete_dir(dname).ok());
}
}
};
const std::string dname = "./ut_dir/bloom_filter_index_reader_writer_test";
template<FieldType type>
void write_bloom_filter_index_file(const std::string& file_name, const void* values,
@ -54,7 +63,7 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({ fname });
Status st = fs::fs_util::block_mgr_for_ut()->create_block(opts, &wblock);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(st.ok()) << st.to_string();
std::unique_ptr<BloomFilterIndexWriter> bloom_filter_index_writer;
BloomFilterOptions bf_options;