[fix](group commit) handle group commit create plan error (#31757)

This commit is contained in:
meiyi
2024-03-05 18:55:03 +08:00
committed by yiguolei
parent 7b51c4aaca
commit eea9b56f69
6 changed files with 27 additions and 44 deletions

View File

@ -45,10 +45,6 @@ WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
}
WalTable::~WalTable() {}
#ifdef BE_TEST
Status k_stream_load_exec_status;
#endif
void WalTable::add_wal(int64_t wal_id, std::string wal) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
LOG(INFO) << "add replay wal=" << wal;
@ -94,19 +90,18 @@ Status WalTable::_relay_wal_one_by_one() {
for (auto wal_info : _replaying_queue) {
wal_info->add_retry_num();
auto st = _replay_wal_internal(wal_info->get_wal_path());
if (!st.ok()) {
doris::wal_fail << 1;
LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
if (!st.is<ErrorCode::NOT_FOUND>() && !st.is<ErrorCode::DATA_QUALITY_ERROR>()) {
need_retry_wals.push_back(wal_info);
} else {
need_delete_wals.push_back(wal_info);
}
} else {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
need_delete_wals.push_back(wal_info);
} else {
doris::wal_fail << 1;
LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
need_retry_wals.push_back(wal_info);
}
}
{
@ -200,7 +195,7 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
[[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
}
#endif
return _replay_one_txn_with_stremaload(wal_id, wal, label);
return _replay_one_wal_with_streamload(wal_id, wal, label);
}
Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
@ -241,6 +236,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
ctx->sql_str = sql_str;
ctx->wal_id = wal_id;
ctx->label = label;
ctx->need_commit_self = false;
ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now
ctx->auth.user = "admin";
ctx->group_commit = false;
@ -263,19 +259,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
return st;
}
Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal,
Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal,
const std::string& label) {
bool success = false;
#ifndef BE_TEST
auto st = _handle_stream_load(wal_id, wal, label);
auto msg = st.msg();
success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
msg.find("LabelAlreadyUsedException") != msg.npos;
return _handle_stream_load(wal_id, wal, label);
#else
success = k_stream_load_exec_status.ok();
auto st = Status::OK();
return Status::OK();
#endif
return success ? Status::OK() : st;
}
void WalTable::stop() {

View File

@ -50,7 +50,7 @@ private:
Status _get_column_info(int64_t db_id, int64_t tb_id,
std::map<int64_t, std::string>& column_info_map);
Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal,
Status _replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal,
const std::string& label);
Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label);
Status _construct_sql_str(const std::string& wal, const std::string& label,

View File

@ -209,13 +209,16 @@ Status GroupCommitTable::get_first_block_load_queue(
"schema version not match, maybe a schema change is in process. Please "
"retry this load manually.");
}
if (!_need_plan_fragment) {
_need_plan_fragment = true;
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
auto st = _create_group_commit_load(load_block_queue, be_exe_version);
if (!st.ok()) {
LOG(WARNING) << "fail to create block queue,st=" << st.to_string();
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
load_block_queue.reset();
std::unique_lock l(_lock);
_is_creating_plan_fragment = false;
_cv.notify_all();
}
}));
}
@ -241,13 +244,6 @@ Status GroupCommitTable::get_first_block_load_queue(
Status GroupCommitTable::_create_group_commit_load(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
Status st = Status::OK();
std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) {
if (!st.ok()) {
std::unique_lock l(_lock);
_need_plan_fragment = false;
_cv.notify_all();
}
});
TStreamLoadPutRequest request;
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
@ -281,13 +277,13 @@ Status GroupCommitTable::_create_group_commit_load(
10000L);
if (!st.ok()) {
LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string();
return st;
}
RETURN_IF_ERROR(st);
st = Status::create<false>(result.status);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
return st;
}
RETURN_IF_ERROR(st);
auto schema_version = result.base_schema_version;
auto is_pipeline = result.__isset.pipeline_params;
auto& params = result.params;
@ -326,7 +322,7 @@ Status GroupCommitTable::_create_group_commit_load(
be_exe_version));
}
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
_is_creating_plan_fragment = false;
_cv.notify_all();
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,

View File

@ -156,7 +156,7 @@ private:
std::condition_variable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
bool _need_plan_fragment = false;
bool _is_creating_plan_fragment = false;
};
class GroupCommitMgr {

View File

@ -44,7 +44,6 @@
namespace doris {
extern TLoadTxnBeginResult k_stream_load_begin_result;
extern Status k_stream_load_exec_status;
ExecEnv* _env = nullptr;
std::filesystem::path wal_dir = std::filesystem::current_path().string() + "/wal_test";
@ -103,7 +102,6 @@ public:
TEST_F(WalManagerTest, recovery_normal) {
_env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
k_stream_load_exec_status = Status::OK();
std::string db_id = "1";
int64_t tb_1_id = 1;

View File

@ -1687,12 +1687,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
try {
loadTxnRollbackImpl(request);
} catch (MetaNotFoundException e) {
String msg = "failed to rollback txn" + request.getTxnId();
LOG.warn(msg, e);
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
LOG.warn("failed to rollback txn {}: {}", request.getTxnId(), e.getMessage());
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {