[Fix](group commit) Fix pre allocated err handling for group commit async load and add regression test #30718
This commit is contained in:
@ -352,9 +352,9 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_
|
||||
}
|
||||
table_ptr->add_wal(wal_id, wal);
|
||||
#ifndef BE_TEST
|
||||
WARN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)),
|
||||
WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)),
|
||||
"Failed to update wal dir limit while add recover wal!");
|
||||
WARN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)),
|
||||
WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)),
|
||||
"Failed to update wal dir used while add recove wal!");
|
||||
#endif
|
||||
return Status::OK();
|
||||
@ -413,7 +413,7 @@ Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t
|
||||
return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes);
|
||||
}
|
||||
|
||||
std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) {
|
||||
std::string WalManager::get_base_wal_path(const std::string& wal_path_str) {
|
||||
io::Path wal_path = wal_path_str;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (!wal_path.has_parent_path()) {
|
||||
@ -505,7 +505,7 @@ Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_que
|
||||
}
|
||||
}
|
||||
erase_wal_queue(table_id, wal_id);
|
||||
RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
|
||||
RETURN_IF_ERROR(update_wal_dir_pre_allocated(get_base_wal_path(wal_path), 0,
|
||||
block_queue_pre_allocated));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -84,13 +84,13 @@ public:
|
||||
std::shared_ptr<std::condition_variable>& cv);
|
||||
Status wait_replay_wal_finish(int64_t wal_id);
|
||||
Status notify_relay_wal(int64_t wal_id);
|
||||
static std::string get_base_wal_path(const std::string& wal_path_str);
|
||||
|
||||
private:
|
||||
// wal back pressure
|
||||
Status _init_wal_dirs_conf();
|
||||
Status _init_wal_dirs();
|
||||
Status _init_wal_dirs_info();
|
||||
std::string _get_base_wal_path(const std::string& wal_path_str);
|
||||
Status _update_wal_dir_info_thread();
|
||||
|
||||
// replay wal
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include "client_cache.h"
|
||||
#include "common/compiler_util.h"
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/fragment_mgr.h"
|
||||
#include "util/debug_points.h"
|
||||
@ -392,6 +393,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
// status: exec_plan_fragment result
|
||||
// st: commit txn rpc status
|
||||
// result_status: commit txn result
|
||||
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
|
||||
{ st = Status::InternalError(""); });
|
||||
if (status.ok() && st.ok() &&
|
||||
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
|
||||
if (!config::group_commit_wait_replay_wal_finish) {
|
||||
@ -405,6 +408,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
std::string wal_path;
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
|
||||
RETURN_IF_ERROR(_exec_env->wal_mgr()->update_wal_dir_pre_allocated(
|
||||
WalManager::get_base_wal_path(wal_path), 0,
|
||||
load_block_queue->block_queue_pre_allocated()));
|
||||
}
|
||||
std::stringstream ss;
|
||||
ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
|
||||
@ -419,6 +425,12 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
|
||||
ss << ", rows=" << state->num_rows_load_success();
|
||||
}
|
||||
LOG(INFO) << ss.str();
|
||||
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", {
|
||||
std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string();
|
||||
LOG(INFO) << "debug promise set: " << msg;
|
||||
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value(
|
||||
Status ::InternalError(msg));
|
||||
};);
|
||||
return st;
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
@ -170,6 +171,8 @@ public:
|
||||
const UniqueId& load_id,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue,
|
||||
int be_exe_version);
|
||||
std::promise<Status> debug_promise;
|
||||
std::future<Status> debug_future = debug_promise.get_future();
|
||||
|
||||
private:
|
||||
ExecEnv* _exec_env = nullptr;
|
||||
|
||||
@ -19,12 +19,14 @@
|
||||
|
||||
#include <gen_cpp/DataSinks_types.h>
|
||||
|
||||
#include <future>
|
||||
#include <shared_mutex>
|
||||
|
||||
#include "common/exception.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/group_commit_mgr.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/debug_points.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/sink/vtablet_finder.h"
|
||||
@ -260,6 +262,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
|
||||
}
|
||||
_is_block_appended = true;
|
||||
_blocks.clear();
|
||||
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", {
|
||||
if (_load_block_queue) {
|
||||
_load_block_queue->remove_load_id(_load_id);
|
||||
}
|
||||
if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
|
||||
std ::chrono ::seconds(60)) == std ::future_status ::ready) {
|
||||
auto st = ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get();
|
||||
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise = std::promise<Status>();
|
||||
ExecEnv::GetInstance()->group_commit_mgr()->debug_future =
|
||||
ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future();
|
||||
LOG(INFO) << "debug future output: " << st.to_string();
|
||||
RETURN_IF_ERROR(st);
|
||||
}
|
||||
});
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,5 @@
|
||||
1,1
|
||||
2,2
|
||||
3,3
|
||||
4,4
|
||||
5,5
|
||||
|
@ -0,0 +1,97 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
|
||||
|
||||
|
||||
def tableName = "wal_test"
|
||||
|
||||
// test successful group commit async load
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k` int ,
|
||||
`v` int ,
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k`)
|
||||
BUCKETS 5
|
||||
properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
|
||||
def exception = false;
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
set 'column_separator', ','
|
||||
set 'group_commit', 'async_mode'
|
||||
unset 'label'
|
||||
file 'group_commit_wal_msg.csv'
|
||||
time 10000
|
||||
}
|
||||
assertFalse(true);
|
||||
} catch (Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
assertTrue(e.getMessage().contains('pre allocated 0 Bytes'))
|
||||
exception = true;
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
|
||||
assertTrue(exception)
|
||||
}
|
||||
|
||||
// test failed group commit async load
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k` int ,
|
||||
`v` int ,
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k`)
|
||||
BUCKETS 5
|
||||
properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
GetDebugPoint().clearDebugPointsForAllBEs()
|
||||
|
||||
exception = false;
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
|
||||
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
set 'column_separator', ','
|
||||
set 'group_commit', 'async_mode'
|
||||
unset 'label'
|
||||
file 'group_commit_wal_msg.csv'
|
||||
time 10000
|
||||
}
|
||||
assertFalse(true);
|
||||
} catch (Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
assertTrue(e.getMessage().contains('pre allocated 0 Bytes'))
|
||||
exception = true;
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg")
|
||||
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status")
|
||||
assertTrue(exception)
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user