[improvment](group_commit) Refector scan wal function (#30939)
Co-authored-by: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com>
This commit is contained in:
@ -39,7 +39,10 @@
|
||||
|
||||
namespace doris {
|
||||
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
|
||||
: _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) {
|
||||
: _exec_env(exec_env),
|
||||
_stop(false),
|
||||
_stop_background_threads_latch(1),
|
||||
_first_replay(true) {
|
||||
_wal_dirs = strings::Split(wal_dir_list, ";", strings::SkipWhitespace());
|
||||
static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
|
||||
.set_min_threads(1)
|
||||
@ -76,11 +79,8 @@ Status WalManager::init() {
|
||||
RETURN_IF_ERROR(_init_wal_dirs_conf());
|
||||
RETURN_IF_ERROR(_init_wal_dirs());
|
||||
RETURN_IF_ERROR(_init_wal_dirs_info());
|
||||
for (auto wal_dir : _wal_dirs) {
|
||||
RETURN_IF_ERROR(_scan_wals(wal_dir));
|
||||
}
|
||||
return Thread::create(
|
||||
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->_replay()); },
|
||||
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->_replay_background()); },
|
||||
&_replay_thread);
|
||||
}
|
||||
|
||||
@ -209,6 +209,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_
|
||||
base_path = _wal_dirs_info->get_available_random_wal_dir();
|
||||
std::stringstream ss;
|
||||
ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
|
||||
<< _wal_version << "_" << _exec_env->master_info()->backend_id << "_"
|
||||
<< std::to_string(wal_id) << "_" << label;
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
|
||||
@ -232,9 +233,70 @@ Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalManager::_scan_wals(const std::string& wal_path) {
|
||||
size_t count = 0;
|
||||
bool exists = true;
|
||||
Status WalManager::parse_wal_path(const std::string& file_name, int64_t& version,
|
||||
int64_t& backend_id, int64_t& wal_id, std::string& label) {
|
||||
try {
|
||||
// find version
|
||||
auto pos = file_name.find("_");
|
||||
version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
|
||||
// find be id
|
||||
auto substring1 = file_name.substr(pos + 1);
|
||||
pos = substring1.find("_");
|
||||
backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
|
||||
// find wal id
|
||||
auto substring2 = substring1.substr(pos + 1);
|
||||
pos = substring2.find("_");
|
||||
wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
|
||||
// find label
|
||||
label = substring2.substr(pos + 1);
|
||||
VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << ",wal_id:" << wal_id
|
||||
<< ",label:" << label;
|
||||
} catch (const std::invalid_argument& e) {
|
||||
return Status::InvalidArgument("Invalid format, {}", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalManager::_load_wals() {
|
||||
std::vector<ScanWalInfo> wals;
|
||||
for (auto wal_dir : _wal_dirs) {
|
||||
WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal dir={}", wal_dir));
|
||||
}
|
||||
for (const auto& wal : wals) {
|
||||
bool exists = false;
|
||||
WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, &exists),
|
||||
fmt::format("fail to check exist on wal file={}", wal.wal_path));
|
||||
if (!exists) {
|
||||
continue;
|
||||
}
|
||||
LOG(INFO) << "find wal: " << wal.wal_path;
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
|
||||
auto it = _wal_path_map.find(wal.wal_id);
|
||||
if (it != _wal_path_map.end()) {
|
||||
LOG(INFO) << "wal_id " << wal.wal_id << " already in wal_path_map, skip it";
|
||||
continue;
|
||||
}
|
||||
_wal_path_map.emplace(wal.wal_id, wal.wal_path);
|
||||
}
|
||||
// this config is use for test p0 case in pipeline
|
||||
if (config::group_commit_wait_replay_wal_finish) {
|
||||
auto lock = std::make_shared<std::mutex>();
|
||||
auto cv = std::make_shared<std::condition_variable>();
|
||||
auto add_st = add_wal_cv_map(wal.wal_id, lock, cv);
|
||||
if (!add_st.ok()) {
|
||||
LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to wal_cv_map";
|
||||
continue;
|
||||
}
|
||||
}
|
||||
WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path),
|
||||
fmt::format("Failed to add recover wal={}", wal.wal_path));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalManager::_scan_wals(const std::string& wal_path, std::vector<ScanWalInfo>& res) {
|
||||
bool exists = false;
|
||||
std::vector<io::FileInfo> dbs;
|
||||
Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists);
|
||||
if (!st.ok()) {
|
||||
@ -249,7 +311,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
|
||||
auto db_path = wal_path + "/" + database_id.file_name;
|
||||
st = io::global_local_filesystem()->list(db_path, false, &tables, &exists);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "failed to list files for wal_dir=" << db_path
|
||||
LOG(WARNING) << "failed list files for wal_dir=" << db_path
|
||||
<< ", st=" << st.to_string();
|
||||
return st;
|
||||
}
|
||||
@ -261,49 +323,48 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
|
||||
auto table_path = db_path + "/" + table_id.file_name;
|
||||
st = io::global_local_filesystem()->list(table_path, false, &wals, &exists);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "failed to list files for wal_dir=" << table_path
|
||||
LOG(WARNING) << "failed list files for wal_dir=" << table_path
|
||||
<< ", st=" << st.to_string();
|
||||
return st;
|
||||
}
|
||||
if (wals.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<std::string> res;
|
||||
for (const auto& wal : wals) {
|
||||
auto wal_file = table_path + "/" + wal.file_name;
|
||||
res.emplace_back(wal_file);
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
|
||||
auto pos = wal.file_name.find("_");
|
||||
try {
|
||||
int64_t wal_id =
|
||||
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10);
|
||||
_wal_path_map.emplace(wal_id, wal_file);
|
||||
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
|
||||
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
|
||||
if (config::group_commit_wait_replay_wal_finish) {
|
||||
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
|
||||
std::shared_ptr<std::condition_variable> cv =
|
||||
std::make_shared<std::condition_variable>();
|
||||
auto add_st = add_wal_cv_map(wal_id, lock, cv);
|
||||
if (!add_st.ok()) {
|
||||
LOG(WARNING) << "fail to add wal_id " << wal_id << " to wal_cv_map";
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file));
|
||||
} catch (const std::invalid_argument& e) {
|
||||
return Status::InvalidArgument("Invalid format, {}", e.what());
|
||||
}
|
||||
}
|
||||
int64_t db_id = -1;
|
||||
int64_t tb_id = -1;
|
||||
try {
|
||||
db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
|
||||
tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
|
||||
} catch (const std::invalid_argument& e) {
|
||||
return Status::InvalidArgument("Invalid format, {}", e.what());
|
||||
}
|
||||
for (const auto& wal : wals) {
|
||||
int64_t version = -1;
|
||||
int64_t backend_id = -1;
|
||||
int64_t wal_id = -1;
|
||||
std::string label = "";
|
||||
auto parse_st = parse_wal_path(wal.file_name, version, backend_id, wal_id, label);
|
||||
if (!parse_st.ok()) {
|
||||
LOG(WARNING) << "fail to parse file=" << wal.file_name
|
||||
<< ",st=" << parse_st.to_string();
|
||||
continue;
|
||||
}
|
||||
auto wal_file = table_path + "/" + wal.file_name;
|
||||
struct ScanWalInfo scan_wal_info;
|
||||
scan_wal_info.wal_path = wal_file;
|
||||
scan_wal_info.db_id = db_id;
|
||||
scan_wal_info.tb_id = tb_id;
|
||||
scan_wal_info.wal_id = wal_id;
|
||||
scan_wal_info.be_id = backend_id;
|
||||
res.emplace_back(scan_wal_info);
|
||||
}
|
||||
count += res.size();
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Finish list wal_dir=" << wal_path << ", wal count=" << count;
|
||||
LOG(INFO) << "Finish list wal_dir=" << wal_path << ", wal count=" << res.size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalManager::_replay() {
|
||||
Status WalManager::_replay_background() {
|
||||
do {
|
||||
if (_stop.load()) {
|
||||
break;
|
||||
@ -313,6 +374,12 @@ Status WalManager::_replay() {
|
||||
_exec_env->master_info()->network_address.port == 0) {
|
||||
continue;
|
||||
}
|
||||
// replay residual wal,only replay once
|
||||
bool expected = true;
|
||||
if (_first_replay.compare_exchange_strong(expected, false)) {
|
||||
RETURN_IF_ERROR(_load_wals());
|
||||
}
|
||||
// replay wal of current process
|
||||
std::vector<int64_t> replay_tables;
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
|
||||
@ -546,4 +613,4 @@ Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, i
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -46,6 +46,13 @@
|
||||
namespace doris {
|
||||
class WalManager {
|
||||
ENABLE_FACTORY_CREATOR(WalManager);
|
||||
struct ScanWalInfo {
|
||||
std::string wal_path;
|
||||
int64_t db_id;
|
||||
int64_t tb_id;
|
||||
int64_t wal_id;
|
||||
int64_t be_id;
|
||||
};
|
||||
|
||||
public:
|
||||
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
|
||||
@ -74,6 +81,13 @@ public:
|
||||
void add_wal_queue(int64_t table_id, int64_t wal_id);
|
||||
void erase_wal_queue(int64_t table_id, int64_t wal_id);
|
||||
size_t get_wal_queue_size(int64_t table_id);
|
||||
// filename format:a_b_c_group_commit_xxx
|
||||
// a:version
|
||||
// b:be id
|
||||
// c:wal id
|
||||
// group_commit_xxx:label
|
||||
static Status parse_wal_path(const std::string& file_name, int64_t& version,
|
||||
int64_t& backend_id, int64_t& wal_id, std::string& label);
|
||||
// fot ut
|
||||
size_t get_wal_table_size(int64_t table_id);
|
||||
|
||||
@ -94,9 +108,12 @@ private:
|
||||
Status _init_wal_dirs_info();
|
||||
Status _update_wal_dir_info_thread();
|
||||
|
||||
// replay wal
|
||||
Status _scan_wals(const std::string& wal_path);
|
||||
Status _replay();
|
||||
// scan all wal files under storage path
|
||||
Status _scan_wals(const std::string& wal_path, std::vector<ScanWalInfo>& res);
|
||||
// use a background thread to do replay task
|
||||
Status _replay_background();
|
||||
// load residual wals
|
||||
Status _load_wals();
|
||||
void _stop_relay_wal();
|
||||
|
||||
public:
|
||||
@ -127,6 +144,9 @@ private:
|
||||
std::shared_mutex _wal_queue_lock;
|
||||
std::unordered_map<int64_t, std::set<int64_t>> _wal_queues;
|
||||
|
||||
int64_t _wal_version = 0;
|
||||
std::atomic<bool> _first_replay;
|
||||
|
||||
// for test relay
|
||||
// <lock, condition_variable>
|
||||
using WalCvInfo =
|
||||
@ -134,4 +154,4 @@ private:
|
||||
std::shared_mutex _wal_cv_lock;
|
||||
std::unordered_map<int64_t, WalCvInfo> _wal_cv_map;
|
||||
};
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -91,13 +91,7 @@ void WalTable::_pick_relay_wals() {
|
||||
Status WalTable::_relay_wal_one_by_one() {
|
||||
std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
|
||||
std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
|
||||
while (!_replaying_queue.empty()) {
|
||||
std::shared_ptr<WalInfo> wal_info = nullptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_replay_wal_lock);
|
||||
wal_info = _replaying_queue.front();
|
||||
_replaying_queue.pop_front();
|
||||
}
|
||||
for (auto wal_info : _replaying_queue) {
|
||||
wal_info->add_retry_num();
|
||||
auto st = _replay_wal_internal(wal_info->get_wal_path());
|
||||
if (!st.ok()) {
|
||||
@ -117,6 +111,7 @@ Status WalTable::_relay_wal_one_by_one() {
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_replay_wal_lock);
|
||||
_replaying_queue.clear();
|
||||
for (auto retry_wal_info : need_retry_wals) {
|
||||
_replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info);
|
||||
}
|
||||
@ -193,9 +188,13 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
|
||||
|
||||
Status WalTable::_replay_wal_internal(const std::string& wal) {
|
||||
LOG(INFO) << "start replay wal=" << wal;
|
||||
int64_t wal_id = 0;
|
||||
int64_t version = -1;
|
||||
int64_t backend_id = -1;
|
||||
int64_t wal_id = -1;
|
||||
std::string label = "";
|
||||
RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
|
||||
io::Path wal_path = wal;
|
||||
auto file_name = wal_path.filename().string();
|
||||
RETURN_IF_ERROR(WalManager::parse_wal_path(file_name, version, backend_id, wal_id, label));
|
||||
#ifndef BE_TEST
|
||||
if (!config::group_commit_wait_replay_wal_finish) {
|
||||
[[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
|
||||
@ -204,19 +203,6 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
|
||||
return _replay_one_txn_with_stremaload(wal_id, wal, label);
|
||||
}
|
||||
|
||||
Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label) {
|
||||
io::Path wal_path = wal;
|
||||
auto file_name = wal_path.filename().string();
|
||||
auto pos = file_name.find("_");
|
||||
try {
|
||||
wal_id = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
|
||||
label = file_name.substr(pos + 1);
|
||||
} catch (const std::invalid_argument& e) {
|
||||
return Status::InvalidArgument("Invalid format, {}", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
|
||||
std::string& sql_str) {
|
||||
std::string columns;
|
||||
|
||||
@ -46,7 +46,6 @@ private:
|
||||
Status _relay_wal_one_by_one();
|
||||
|
||||
Status _replay_wal_internal(const std::string& wal);
|
||||
Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label);
|
||||
Status _try_abort_txn(int64_t db_id, std::string& label);
|
||||
Status _get_column_info(int64_t db_id, int64_t tb_id,
|
||||
std::map<int64_t, std::string>& column_info_map);
|
||||
|
||||
@ -59,6 +59,7 @@ public:
|
||||
_env->_master_info = new TMasterInfo();
|
||||
_env->_master_info->network_address.hostname = "host name";
|
||||
_env->_master_info->network_address.port = 1234;
|
||||
_env->_master_info->backend_id = 1001;
|
||||
_env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
|
||||
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
|
||||
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
|
||||
@ -71,6 +72,8 @@ public:
|
||||
Status st = io::global_local_filesystem()->delete_directory(wal_dir);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "fail to delete " << wal_dir.string();
|
||||
} else {
|
||||
LOG(INFO) << "delete " << wal_dir.string();
|
||||
}
|
||||
SAFE_STOP(_env->_wal_manager);
|
||||
SAFE_DELETE(_env->_function_client_cache);
|
||||
@ -81,7 +84,7 @@ public:
|
||||
void prepare() {
|
||||
Status st = io::global_local_filesystem()->create_directory(wal_dir);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "create dir " << wal_dir.string();
|
||||
LOG(WARNING) << "fail to create dir " << wal_dir.string();
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,47 +107,53 @@ TEST_F(WalManagerTest, recovery_normal) {
|
||||
|
||||
std::string db_id = "1";
|
||||
int64_t tb_1_id = 1;
|
||||
std::string wal_100_id = "100";
|
||||
std::string wal_101_id = "101";
|
||||
std::string wal_file_1 = "0_1001_1_group_commit_label1";
|
||||
std::string wal_file_2 = "0_1001_2_group_commit_label2";
|
||||
int64_t tb_2_id = 2;
|
||||
std::string wal_200_id = "200";
|
||||
std::string wal_201_id = "201";
|
||||
std::string wal_file_3 = "0_1001_3_group_commit_label3";
|
||||
std::string wal_file_4 = "0_1001_4_group_commit_label4";
|
||||
|
||||
bool res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id);
|
||||
ASSERT_TRUE(res);
|
||||
res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id + "/" +
|
||||
std::to_string(tb_1_id));
|
||||
ASSERT_TRUE(res);
|
||||
std::string wal_100 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_100_id;
|
||||
std::string wal_101 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_101_id;
|
||||
createWal(wal_100);
|
||||
createWal(wal_101);
|
||||
std::string wal_1 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_file_1;
|
||||
std::string wal_2 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) + "/" + wal_file_2;
|
||||
createWal(wal_1);
|
||||
createWal(wal_2);
|
||||
|
||||
res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id + "/" +
|
||||
std::to_string(tb_2_id));
|
||||
ASSERT_TRUE(res);
|
||||
std::string wal_200 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_200_id;
|
||||
std::string wal_201 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_201_id;
|
||||
createWal(wal_200);
|
||||
createWal(wal_201);
|
||||
std::string wal_3 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_file_3;
|
||||
std::string wal_4 =
|
||||
wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) + "/" + wal_file_4;
|
||||
createWal(wal_3);
|
||||
createWal(wal_4);
|
||||
Status st = _env->wal_mgr()->init();
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "fail to int wal manager ";
|
||||
}
|
||||
|
||||
while (_env->wal_mgr()->get_wal_table_size(tb_1_id) > 0 ||
|
||||
_env->wal_mgr()->get_wal_table_size(tb_2_id) > 0) {
|
||||
auto count = 0;
|
||||
while (std::filesystem::exists(wal_1) || std::filesystem::exists(wal_2) ||
|
||||
std::filesystem::exists(wal_3) || std::filesystem::exists(wal_4)) {
|
||||
if (count > 30) {
|
||||
LOG(WARNING) << "wait time out";
|
||||
break;
|
||||
}
|
||||
sleep(1);
|
||||
count++;
|
||||
continue;
|
||||
}
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_100));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_101));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_200));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_201));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_1));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_2));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_3));
|
||||
ASSERT_TRUE(!std::filesystem::exists(wal_4));
|
||||
}
|
||||
|
||||
TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
|
||||
|
||||
@ -60,6 +60,8 @@ private:
|
||||
int64_t db_id = 1;
|
||||
int64_t tb_id = 2;
|
||||
int64_t txn_id = 789;
|
||||
int64_t version = 0;
|
||||
int64_t backend_id = 1001;
|
||||
std::string label = "test";
|
||||
|
||||
TupleId _dst_tuple_id = 0;
|
||||
@ -197,10 +199,6 @@ void VWalScannerTest::init() {
|
||||
init_desc_table();
|
||||
static_cast<void>(io::global_local_filesystem()->create_directory(
|
||||
wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id)));
|
||||
std::string src = "./be/test/exec/test_data/wal_scanner/wal";
|
||||
std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" +
|
||||
std::to_string(txn_id) + "_" + label;
|
||||
std::filesystem::copy(src, dst);
|
||||
|
||||
// Node Id
|
||||
_tnode.node_id = 0;
|
||||
@ -213,10 +211,19 @@ void VWalScannerTest::init() {
|
||||
_tnode.__isset.file_scan_node = true;
|
||||
|
||||
_env = ExecEnv::GetInstance();
|
||||
_env->_master_info = new TMasterInfo();
|
||||
_env->_master_info->network_address.hostname = "host name";
|
||||
_env->_master_info->network_address.port = backend_id;
|
||||
_env->_master_info->backend_id = 1001;
|
||||
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
|
||||
std::string base_path;
|
||||
auto st = _env->_wal_manager->_init_wal_dirs_info();
|
||||
st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
|
||||
std::string src = "./be/test/exec/test_data/wal_scanner/wal";
|
||||
std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" +
|
||||
std::to_string(version) + "_" + std::to_string(backend_id) + "_" +
|
||||
std::to_string(txn_id) + "_" + label;
|
||||
std::filesystem::copy(src, dst);
|
||||
}
|
||||
|
||||
TEST_F(VWalScannerTest, normal) {
|
||||
|
||||
Reference in New Issue
Block a user