[improvement](group_commit) Rename fail wal to tmp should only use in test P0 scenario (#30959)
This commit is contained in:
@ -1119,6 +1119,7 @@ DEFINE_Int16(bitmap_serialize_version, "1");
|
||||
DEFINE_String(group_commit_wal_path, "");
|
||||
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
|
||||
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
|
||||
DEFINE_Int32(group_commit_replay_wal_retry_interval_max_seconds, "1800");
|
||||
DEFINE_Int32(group_commit_relay_wal_threads, "10");
|
||||
// This config can be set to limit thread number in group commit request fragment thread pool.
|
||||
DEFINE_Int32(group_commit_insert_threads, "10");
|
||||
|
||||
@ -1186,6 +1186,7 @@ DECLARE_Int16(bitmap_serialize_version);
|
||||
DECLARE_String(group_commit_wal_path);
|
||||
DECLARE_Int32(group_commit_replay_wal_retry_num);
|
||||
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
|
||||
DECLARE_Int32(group_commit_replay_wal_retry_interval_max_seconds);
|
||||
DECLARE_mInt32(group_commit_relay_wal_threads);
|
||||
// This config can be set to limit thread number in group commit request fragment thread pool.
|
||||
DECLARE_mInt32(group_commit_insert_threads);
|
||||
|
||||
@ -32,7 +32,7 @@ std::string WalInfo::get_wal_path() {
|
||||
return _wal_path;
|
||||
}
|
||||
|
||||
int64_t WalInfo::get_retry_num() {
|
||||
int32_t WalInfo::get_retry_num() {
|
||||
return _retry_num;
|
||||
}
|
||||
|
||||
|
||||
@ -23,7 +23,7 @@ public:
|
||||
WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms);
|
||||
~WalInfo() = default;
|
||||
int64_t get_wal_id();
|
||||
int64_t get_retry_num();
|
||||
int32_t get_retry_num();
|
||||
int64_t get_start_time_ms();
|
||||
std::string get_wal_path();
|
||||
void add_retry_num();
|
||||
@ -31,7 +31,7 @@ public:
|
||||
private:
|
||||
int64_t _wal_id;
|
||||
std::string _wal_path;
|
||||
int64_t _retry_num;
|
||||
int32_t _retry_num;
|
||||
int64_t _start_time_ms;
|
||||
};
|
||||
|
||||
|
||||
@ -39,14 +39,22 @@ static Status _deserialize(PBlock& block, const std::string& buf) {
|
||||
}
|
||||
|
||||
Status WalReader::init() {
|
||||
bool exists = false;
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->exists(_file_name, &exists));
|
||||
if (!exists) {
|
||||
LOG(WARNING) << "not exist wal= " << _file_name;
|
||||
return Status::NotFound("wal {} doesn't exist", _file_name);
|
||||
}
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalReader::finalize() {
|
||||
auto st = file_reader->close();
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "fail to close wal " << _file_name;
|
||||
if (file_reader != nullptr) {
|
||||
auto st = file_reader->close();
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "fail to close wal " << _file_name << " st= " << st.to_string();
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -59,16 +59,15 @@ void WalTable::_pick_relay_wals() {
|
||||
std::vector<std::string> need_replay_wals;
|
||||
std::vector<std::string> need_erase_wals;
|
||||
for (const auto& [wal_path, wal_info] : _replay_wal_map) {
|
||||
if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
|
||||
if (config::group_commit_wait_replay_wal_finish &&
|
||||
wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
|
||||
LOG(WARNING) << "failed to replay wal=" << wal_path << " after retry "
|
||||
<< wal_info->get_retry_num() << " times";
|
||||
[[maybe_unused]] auto st = _exec_env->wal_mgr()->rename_to_tmp_path(
|
||||
wal_path, _table_id, wal_info->get_wal_id());
|
||||
if (config::group_commit_wait_replay_wal_finish) {
|
||||
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
|
||||
if (!notify_st.ok()) {
|
||||
LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
|
||||
}
|
||||
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
|
||||
if (!notify_st.ok()) {
|
||||
LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
|
||||
}
|
||||
need_erase_wals.push_back(wal_path);
|
||||
continue;
|
||||
@ -153,8 +152,16 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
|
||||
return true;
|
||||
}
|
||||
#ifndef BE_TEST
|
||||
auto replay_interval = pow(2, wal_info->get_retry_num()) *
|
||||
config::group_commit_replay_wal_retry_interval_seconds * 1000;
|
||||
auto replay_interval = 0;
|
||||
if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
|
||||
replay_interval = pow(2, config::group_commit_replay_wal_retry_num) *
|
||||
config::group_commit_replay_wal_retry_interval_seconds * 1000 +
|
||||
(wal_info->get_retry_num() - config::group_commit_replay_wal_retry_num) *
|
||||
config::group_commit_replay_wal_retry_interval_max_seconds * 1000;
|
||||
} else {
|
||||
replay_interval = pow(2, wal_info->get_retry_num()) *
|
||||
config::group_commit_replay_wal_retry_interval_seconds * 1000;
|
||||
}
|
||||
return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
|
||||
#else
|
||||
return true;
|
||||
|
||||
Reference in New Issue
Block a user