diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 0389c6fed2..14e3779748 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -45,10 +45,6 @@ WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) } WalTable::~WalTable() {} -#ifdef BE_TEST -Status k_stream_load_exec_status; -#endif - void WalTable::add_wal(int64_t wal_id, std::string wal) { std::lock_guard lock(_replay_wal_lock); LOG(INFO) << "add replay wal=" << wal; @@ -94,19 +90,18 @@ Status WalTable::_relay_wal_one_by_one() { for (auto wal_info : _replaying_queue) { wal_info->add_retry_num(); auto st = _replay_wal_internal(wal_info->get_wal_path()); - if (!st.ok()) { - doris::wal_fail << 1; - LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() - << ", st=" << st.to_string(); - if (!st.is() && !st.is()) { - need_retry_wals.push_back(wal_info); - } else { - need_delete_wals.push_back(wal_info); - } - } else { + auto msg = st.msg(); + if (st.ok() || st.is() || st.is() || + st.is() || + msg.find("LabelAlreadyUsedException") != msg.npos) { LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); need_delete_wals.push_back(wal_info); + } else { + doris::wal_fail << 1; + LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() + << ", st=" << st.to_string(); + need_retry_wals.push_back(wal_info); } } { @@ -200,7 +195,7 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); } #endif - return _replay_one_txn_with_stremaload(wal_id, wal, label); + return _replay_one_wal_with_streamload(wal_id, wal, label); } Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, @@ -241,6 +236,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, ctx->sql_str = sql_str; ctx->wal_id = wal_id; ctx->label = label; + ctx->need_commit_self = false; ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now ctx->auth.user = "admin"; ctx->group_commit = false; @@ -263,19 +259,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, return st; } -Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, +Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal, const std::string& label) { - bool success = false; #ifndef BE_TEST - auto st = _handle_stream_load(wal_id, wal, label); - auto msg = st.msg(); - success = st.ok() || st.is() || - msg.find("LabelAlreadyUsedException") != msg.npos; + return _handle_stream_load(wal_id, wal, label); #else - success = k_stream_load_exec_status.ok(); - auto st = Status::OK(); + return Status::OK(); #endif - return success ? Status::OK() : st; } void WalTable::stop() { diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h index f6ed3d865b..0e781cd915 100644 --- a/be/src/olap/wal/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -50,7 +50,7 @@ private: Status _get_column_info(int64_t db_id, int64_t tb_id, std::map& column_info_map); - Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, + Status _replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal, const std::string& label); Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); Status _construct_sql_str(const std::string& wal, const std::string& label, diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 1e231055cf..887dd856bb 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -209,13 +209,16 @@ Status GroupCommitTable::get_first_block_load_queue( "schema version not match, maybe a schema change is in process. Please " "retry this load manually."); } - if (!_need_plan_fragment) { - _need_plan_fragment = true; + if (!_is_creating_plan_fragment) { + _is_creating_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { auto st = _create_group_commit_load(load_block_queue, be_exe_version); if (!st.ok()) { - LOG(WARNING) << "fail to create block queue,st=" << st.to_string(); + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); load_block_queue.reset(); + std::unique_lock l(_lock); + _is_creating_plan_fragment = false; + _cv.notify_all(); } })); } @@ -241,13 +244,6 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( std::shared_ptr& load_block_queue, int be_exe_version) { Status st = Status::OK(); - std::unique_ptr> finish_plan_func((int*)0x01, [&](int*) { - if (!st.ok()) { - std::unique_lock l(_lock); - _need_plan_fragment = false; - _cv.notify_all(); - } - }); TStreamLoadPutRequest request; UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; @@ -281,13 +277,13 @@ Status GroupCommitTable::_create_group_commit_load( 10000L); if (!st.ok()) { LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string(); + return st; } - RETURN_IF_ERROR(st); st = Status::create(result.status); if (!st.ok()) { LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + return st; } - RETURN_IF_ERROR(st); auto schema_version = result.base_schema_version; auto is_pipeline = result.__isset.pipeline_params; auto& params = result.params; @@ -326,7 +322,7 @@ Status GroupCommitTable::_create_group_commit_load( be_exe_version)); } _load_block_queues.emplace(instance_id, load_block_queue); - _need_plan_fragment = false; + _is_creating_plan_fragment = false; _cv.notify_all(); } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 5357ba208f..dc135ea106 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -156,7 +156,7 @@ private: std::condition_variable _cv; // fragment_instance_id to load_block_queue std::unordered_map> _load_block_queues; - bool _need_plan_fragment = false; + bool _is_creating_plan_fragment = false; }; class GroupCommitMgr { diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index d264cb621c..2dc59cec9d 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -44,7 +44,6 @@ namespace doris { extern TLoadTxnBeginResult k_stream_load_begin_result; -extern Status k_stream_load_exec_status; ExecEnv* _env = nullptr; std::filesystem::path wal_dir = std::filesystem::current_path().string() + "/wal_test"; @@ -103,7 +102,6 @@ public: TEST_F(WalManagerTest, recovery_normal) { _env->wal_mgr()->wal_limit_test_bytes = 1099511627776; - k_stream_load_exec_status = Status::OK(); std::string db_id = "1"; int64_t tb_1_id = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 850686116e..1316425df4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1687,12 +1687,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { try { loadTxnRollbackImpl(request); } catch (MetaNotFoundException e) { - String msg = "failed to rollback txn" + request.getTxnId(); - LOG.warn(msg, e); + LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); status.setStatusCode(TStatusCode.NOT_FOUND); status.addToErrorMsgs(e.getMessage()); } catch (UserException e) { - LOG.warn("failed to rollback txn {}: {}", request.getTxnId(), e.getMessage()); + LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (Throwable e) {